From 8558cdc3e26a0e28c386168be519f818678aadef Mon Sep 17 00:00:00 2001 From: Brennan Date: Tue, 21 Jun 2022 14:29:01 -0700 Subject: [PATCH] Add CreateChained rate limiter API (#70739) --- .../ref/System.Threading.RateLimiting.cs | 1 + .../src/System.Threading.RateLimiting.csproj | 2 + .../ChainedPartitionedRateLimiter.cs | 248 ++++ .../DefaultPartitionedRateLimiter.cs | 276 +++++ .../RateLimiting/PartitionedRateLimiter.cs | 282 +---- .../tests/ChainedLimiterTests.cs | 1085 +++++++++++++++++ .../tests/Infrastructure/Utils.cs | 228 ++++ .../tests/PartitionedRateLimiterTests.cs | 189 +-- ...System.Threading.RateLimiting.Tests.csproj | 2 + 9 files changed, 1871 insertions(+), 442 deletions(-) create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedPartitionedRateLimiter.cs create mode 100644 src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs create mode 100644 src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs create mode 100644 src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs diff --git a/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs b/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs index 7c4f99e6167b1..52458f8b1422f 100644 --- a/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs +++ b/src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs @@ -64,6 +64,7 @@ public MetadataName(string name) { } } public static partial class PartitionedRateLimiter { + public static System.Threading.RateLimiting.PartitionedRateLimiter CreateChained(params System.Threading.RateLimiting.PartitionedRateLimiter[] limiters) { throw null; } public static System.Threading.RateLimiting.PartitionedRateLimiter Create(System.Func> partitioner, System.Collections.Generic.IEqualityComparer? equalityComparer = null) where TPartitionKey : notnull { throw null; } } public abstract partial class PartitionedRateLimiter : System.IAsyncDisposable, System.IDisposable diff --git a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj index 90381d05e8d1e..08e4bff3800e1 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj +++ b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj @@ -14,8 +14,10 @@ System.Threading.RateLimiting.RateLimitLease + + diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedPartitionedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedPartitionedRateLimiter.cs new file mode 100644 index 0000000000000..45f2d5271a2b6 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ChainedPartitionedRateLimiter.cs @@ -0,0 +1,248 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + /// + /// Acquires leases from rate limiters in the order given. If a lease fails to be acquired (throwing or IsAcquired == false) + /// then the already acquired leases are disposed in reverse order and the failing lease is returned or the exception is thrown to user code. + /// + internal sealed class ChainedPartitionedRateLimiter : PartitionedRateLimiter + { + private readonly PartitionedRateLimiter[] _limiters; + private bool _disposed; + + public ChainedPartitionedRateLimiter(PartitionedRateLimiter[] limiters) + { + _limiters = limiters; + } + + public override int GetAvailablePermits(TResource resourceID) + { + ThrowIfDisposed(); + int lowestPermitCount = int.MaxValue; + foreach (PartitionedRateLimiter limiter in _limiters) + { + int permitCount = limiter.GetAvailablePermits(resourceID); + + if (permitCount < lowestPermitCount) + { + lowestPermitCount = permitCount; + } + } + + return lowestPermitCount; + } + + protected override RateLimitLease AcquireCore(TResource resourceID, int permitCount) + { + ThrowIfDisposed(); + RateLimitLease[]? leases = null; + for (int i = 0; i < _limiters.Length; i++) + { + RateLimitLease? lease = null; + Exception? exception = null; + try + { + lease = _limiters[i].Acquire(resourceID, permitCount); + } + catch (Exception ex) + { + exception = ex; + } + RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length); + if (notAcquiredLease is not null) + { + return notAcquiredLease; + } + } + + return new CombinedRateLimitLease(leases!); + } + + protected override async ValueTask WaitAsyncCore(TResource resourceID, int permitCount, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + RateLimitLease[]? leases = null; + for (int i = 0; i < _limiters.Length; i++) + { + RateLimitLease? lease = null; + Exception? exception = null; + try + { + lease = await _limiters[i].WaitAsync(resourceID, permitCount, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + exception = ex; + } + RateLimitLease? notAcquiredLease = CommonAcquireLogic(exception, lease, ref leases, i, _limiters.Length); + if (notAcquiredLease is not null) + { + return notAcquiredLease; + } + } + + return new CombinedRateLimitLease(leases!); + } + + protected override void Dispose(bool disposing) + { + _disposed = true; + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(ChainedPartitionedRateLimiter)); + } + } + + private static RateLimitLease? CommonAcquireLogic(Exception? ex, RateLimitLease? lease, ref RateLimitLease[]? leases, int index, int length) + { + if (ex is not null) + { + AggregateException? innerEx = CommonDispose(leases, index); + if (innerEx is not null) + { + Exception[] exceptions = new Exception[innerEx.InnerExceptions.Count + 1]; + innerEx.InnerExceptions.CopyTo(exceptions, 0); + exceptions[exceptions.Length - 1] = ex; + throw new AggregateException(exceptions); + } + throw ex; + } + + if (!lease!.IsAcquired) + { + AggregateException? innerEx = CommonDispose(leases, index); + return innerEx is not null ? throw innerEx : lease; + } + + leases ??= new RateLimitLease[length]; + leases[index] = lease; + return null; + } + + private static AggregateException? CommonDispose(RateLimitLease[]? leases, int i) + { + List? exceptions = null; + while (i > 0) + { + i--; + try + { + leases![i].Dispose(); + } + catch (Exception ex) + { + exceptions ??= new List(); + exceptions.Add(ex); + } + } + + if (exceptions is not null) + { + return new AggregateException(exceptions); + } + + return null; + } + + private sealed class CombinedRateLimitLease : RateLimitLease + { + private RateLimitLease[]? _leases; + private HashSet? _metadataNames; + + public CombinedRateLimitLease(RateLimitLease[] leases) + { + _leases = leases; + } + + public override bool IsAcquired => true; + + public override IEnumerable MetadataNames + { + get + { + if (_leases is null) + { + return Enumerable.Empty(); + } + + if (_metadataNames is null) + { + _metadataNames = new HashSet(); + foreach (RateLimitLease lease in _leases) + { + foreach (string metadataName in lease.MetadataNames) + { + _metadataNames.Add(metadataName); + } + } + } + return _metadataNames; + } + } + + public override bool TryGetMetadata(string metadataName, out object? metadata) + { + if (_leases is not null) + { + foreach (RateLimitLease lease in _leases) + { + // Use the first metadata item of a given name, ignore duplicates, we can't reliably merge arbitrary metadata + // Creating an object[] if there are multiple of the same metadataName could work, but makes consumption of metadata messy + // and makes MetadataName.Create(...) uses no longer work + if (lease.TryGetMetadata(metadataName, out metadata)) + { + return true; + } + } + } + + metadata = null; + return false; + } + + protected override void Dispose(bool disposing) + { + if (_leases is null) + { + return; + } + + List? exceptions = null; + // Dispose in reverse order + // Avoids issues where dispose might unblock a queued acquire and then the acquire fails when acquiring the next limiter. + // When disposing in reverse order there wont be any issues of unblocking an acquire that affects acquires on limiters in the chain after it + for (int i = _leases.Length - 1; i >= 0; i--) + { + try + { + _leases[i].Dispose(); + } + catch (Exception ex) + { + exceptions ??= new List(); + exceptions.Add(ex); + } + } + + _leases = null; + + if (exceptions is not null) + { + throw new AggregateException(exceptions); + } + } + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs new file mode 100644 index 0000000000000..c9325ec7f3c92 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/DefaultPartitionedRateLimiter.cs @@ -0,0 +1,276 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace System.Threading.RateLimiting +{ + internal sealed class DefaultPartitionedRateLimiter : PartitionedRateLimiter where TKey : notnull + { + private readonly Func> _partitioner; + private static TimeSpan _idleTimeLimit = TimeSpan.FromSeconds(10); + + // TODO: Look at ConcurrentDictionary to try and avoid a global lock + private Dictionary> _limiters; + private bool _disposed; + private TaskCompletionSource _disposeComplete = new(TaskCreationOptions.RunContinuationsAsynchronously); + + // Used by the Timer to call TryRelenish on ReplenishingRateLimiters + // We use a separate list to avoid running TryReplenish (which might be user code) inside our lock + // And we cache the list to amortize the allocation cost to as close to 0 as we can get + private List>> _cachedLimiters = new(); + private bool _cacheInvalid; + private List _limitersToDispose = new(); + private TimerAwaitable _timer; + private Task _timerTask; + + // Use the Dictionary as the lock field so we don't need to allocate another object for a lock and have another field in the object + private object Lock => _limiters; + + public DefaultPartitionedRateLimiter(Func> partitioner, + IEqualityComparer? equalityComparer = null) + { + _limiters = new Dictionary>(equalityComparer); + _partitioner = partitioner; + + // TODO: Figure out what interval we should use + _timer = new TimerAwaitable(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(100)); + _timerTask = RunTimer(); + } + + private async Task RunTimer() + { + _timer.Start(); + while (await _timer) + { + try + { + await Heartbeat().ConfigureAwait(false); + } + // TODO: Can we log to EventSource or somewhere? Maybe dispatch throwing the exception so it is at least an unhandled exception? + catch { } + } + _timer.Dispose(); + } + + public override int GetAvailablePermits(TResource resourceID) + { + return GetRateLimiter(resourceID).GetAvailablePermits(); + } + + protected override RateLimitLease AcquireCore(TResource resourceID, int permitCount) + { + return GetRateLimiter(resourceID).Acquire(permitCount); + } + + protected override ValueTask WaitAsyncCore(TResource resourceID, int permitCount, CancellationToken cancellationToken) + { + return GetRateLimiter(resourceID).WaitAsync(permitCount, cancellationToken); + } + + private RateLimiter GetRateLimiter(TResource resourceID) + { + RateLimitPartition partition = _partitioner(resourceID); + Lazy? limiter; + lock (Lock) + { + ThrowIfDisposed(); + if (!_limiters.TryGetValue(partition.PartitionKey, out limiter)) + { + // Using Lazy avoids calling user code (partition.Factory) inside the lock + limiter = new Lazy(() => partition.Factory(partition.PartitionKey)); + _limiters.Add(partition.PartitionKey, limiter); + // Cache is invalid now + _cacheInvalid = true; + } + } + return limiter.Value; + } + + protected override void Dispose(bool disposing) + { + if (!disposing) + { + return; + } + + bool alreadyDisposed = CommonDispose(); + + _timerTask.GetAwaiter().GetResult(); + _cachedLimiters.Clear(); + + if (alreadyDisposed) + { + _disposeComplete.Task.GetAwaiter().GetResult(); + return; + } + + List? exceptions = null; + + // Safe to access _limiters outside the lock + // The timer is no longer running and _disposed is set so anyone trying to access fields will be checking that first + foreach (KeyValuePair> limiter in _limiters) + { + try + { + limiter.Value.Value.Dispose(); + } + catch (Exception ex) + { + exceptions ??= new List(); + exceptions.Add(ex); + } + } + _limiters.Clear(); + _disposeComplete.TrySetResult(null); + + if (exceptions is not null) + { + throw new AggregateException(exceptions); + } + } + + protected override async ValueTask DisposeAsyncCore() + { + bool alreadyDisposed = CommonDispose(); + + await _timerTask.ConfigureAwait(false); + _cachedLimiters.Clear(); + + if (alreadyDisposed) + { + await _disposeComplete.Task.ConfigureAwait(false); + return; + } + + List? exceptions = null; + foreach (KeyValuePair> limiter in _limiters) + { + try + { + await limiter.Value.Value.DisposeAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + exceptions ??= new List(); + exceptions.Add(ex); + } + } + _limiters.Clear(); + _disposeComplete.TrySetResult(null); + + if (exceptions is not null) + { + throw new AggregateException(exceptions); + } + } + + // This handles the common state changes that Dispose and DisposeAsync need to do, the individual limiters still need to be Disposed after this call + private bool CommonDispose() + { + lock (Lock) + { + if (_disposed) + { + return true; + } + _disposed = true; + _timer.Stop(); + } + return false; + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(PartitionedRateLimiter)); + } + } + + private async Task Heartbeat() + { + lock (Lock) + { + if (_disposed) + { + return; + } + + // If the cache has been invalidated we need to recreate it + if (_cacheInvalid) + { + _cachedLimiters.Clear(); + _cachedLimiters.AddRange(_limiters); + } + } + + List? aggregateExceptions = null; + + // cachedLimiters is safe to use outside the lock because it is only updated by the Timer + foreach (KeyValuePair> rateLimiter in _cachedLimiters) + { + if (!rateLimiter.Value.IsValueCreated) + { + continue; + } + if (rateLimiter.Value.Value.IdleDuration is TimeSpan idleDuration && idleDuration > _idleTimeLimit) + { + lock (Lock) + { + // Check time again under lock to make sure no one calls Acquire or WaitAsync after checking the time and removing the limiter + idleDuration = rateLimiter.Value.Value.IdleDuration ?? TimeSpan.Zero; + if (idleDuration > _idleTimeLimit) + { + // Remove limiter from the lookup table and mark cache as invalid + // If a request for this partition comes in it will have to create a new limiter now + // And the next time the timer runs the cache needs to be updated to no longer have a reference to this limiter + _cacheInvalid = true; + _limiters.Remove(rateLimiter.Key); + + // We don't want to dispose inside the lock so we need to defer it + _limitersToDispose.Add(rateLimiter.Value.Value); + } + } + } + // We know the limiter can be replenished so let's attempt to replenish tokens + else if (rateLimiter.Value.Value is ReplenishingRateLimiter replenishingRateLimiter) + { + try + { + replenishingRateLimiter.TryReplenish(); + } + catch (Exception ex) + { + aggregateExceptions ??= new List(); + aggregateExceptions.Add(ex); + } + } + } + + foreach (RateLimiter limiter in _limitersToDispose) + { + try + { + await limiter.DisposeAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + aggregateExceptions ??= new List(); + aggregateExceptions.Add(ex); + } + } + _limitersToDispose.Clear(); + + if (aggregateExceptions is not null) + { + throw new AggregateException(aggregateExceptions); + } + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs index 71db0d425e57b..aa79d5da4f0c8 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/PartitionedRateLimiter.cs @@ -29,268 +29,38 @@ public static PartitionedRateLimiter Create { return new DefaultPartitionedRateLimiter(partitioner, equalityComparer); } - } - - internal sealed class DefaultPartitionedRateLimiter : PartitionedRateLimiter where TKey : notnull - { - private readonly Func> _partitioner; - private static TimeSpan _idleTimeLimit = TimeSpan.FromSeconds(10); - - // TODO: Look at ConcurrentDictionary to try and avoid a global lock - private Dictionary> _limiters; - private bool _disposed; - private TaskCompletionSource _disposeComplete = new(TaskCreationOptions.RunContinuationsAsynchronously); - - // Used by the Timer to call TryRelenish on ReplenishingRateLimiters - // We use a separate list to avoid running TryReplenish (which might be user code) inside our lock - // And we cache the list to amortize the allocation cost to as close to 0 as we can get - private List>> _cachedLimiters = new(); - private bool _cacheInvalid; - private List _limitersToDispose = new(); - private TimerAwaitable _timer; - private Task _timerTask; - - // Use the Dictionary as the lock field so we don't need to allocate another object for a lock and have another field in the object - private object Lock => _limiters; - - public DefaultPartitionedRateLimiter(Func> partitioner, - IEqualityComparer? equalityComparer = null) - { - _limiters = new Dictionary>(equalityComparer); - _partitioner = partitioner; - - // TODO: Figure out what interval we should use - _timer = new TimerAwaitable(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(100)); - _timerTask = RunTimer(); - } - - private async Task RunTimer() - { - _timer.Start(); - while (await _timer) - { - try - { - await Heartbeat().ConfigureAwait(false); - } - // TODO: Can we log to EventSource or somewhere? Maybe dispatch throwing the exception so it is at least an unhandled exception? - catch { } - } - _timer.Dispose(); - } - - public override int GetAvailablePermits(TResource resourceID) - { - return GetRateLimiter(resourceID).GetAvailablePermits(); - } - - protected override RateLimitLease AcquireCore(TResource resourceID, int permitCount) - { - return GetRateLimiter(resourceID).Acquire(permitCount); - } - - protected override ValueTask WaitAsyncCore(TResource resourceID, int permitCount, CancellationToken cancellationToken) - { - return GetRateLimiter(resourceID).WaitAsync(permitCount, cancellationToken); - } - - private RateLimiter GetRateLimiter(TResource resourceID) - { - RateLimitPartition partition = _partitioner(resourceID); - Lazy? limiter; - lock (Lock) - { - ThrowIfDisposed(); - if (!_limiters.TryGetValue(partition.PartitionKey, out limiter)) - { - // Using Lazy avoids calling user code (partition.Factory) inside the lock - limiter = new Lazy(() => partition.Factory(partition.PartitionKey)); - _limiters.Add(partition.PartitionKey, limiter); - // Cache is invalid now - _cacheInvalid = true; - } - } - return limiter.Value; - } - - protected override void Dispose(bool disposing) - { - if (!disposing) - { - return; - } - - bool alreadyDisposed = CommonDispose(); - - _timerTask.GetAwaiter().GetResult(); - _cachedLimiters.Clear(); - - if (alreadyDisposed) - { - _disposeComplete.Task.GetAwaiter().GetResult(); - return; - } - - List? exceptions = null; - - // Safe to access _limiters outside the lock - // The timer is no longer running and _disposed is set so anyone trying to access fields will be checking that first - foreach (KeyValuePair> limiter in _limiters) - { - try - { - limiter.Value.Value.Dispose(); - } - catch (Exception ex) - { - exceptions ??= new List(); - exceptions.Add(ex); - } - } - _limiters.Clear(); - _disposeComplete.TrySetResult(null); - if (exceptions is not null) - { - throw new AggregateException(exceptions); - } - } - - protected override async ValueTask DisposeAsyncCore() - { - bool alreadyDisposed = CommonDispose(); - - await _timerTask.ConfigureAwait(false); - _cachedLimiters.Clear(); - - if (alreadyDisposed) - { - await _disposeComplete.Task.ConfigureAwait(false); - return; - } - - List? exceptions = null; - foreach (KeyValuePair> limiter in _limiters) - { - try - { - await limiter.Value.Value.DisposeAsync().ConfigureAwait(false); - } - catch (Exception ex) - { - exceptions ??= new List(); - exceptions.Add(ex); - } - } - _limiters.Clear(); - _disposeComplete.TrySetResult(null); - - if (exceptions is not null) - { - throw new AggregateException(exceptions); - } - } - - // This handles the common state changes that Dispose and DisposeAsync need to do, the individual limiters still need to be Disposed after this call - private bool CommonDispose() - { - lock (Lock) - { - if (_disposed) - { - return true; - } - _disposed = true; - _timer.Stop(); - } - return false; - } - - private void ThrowIfDisposed() - { - if (_disposed) - { - throw new ObjectDisposedException(nameof(PartitionedRateLimiter)); - } - } - - private async Task Heartbeat() + /// + /// Creates a single that wraps the passed in s. + /// + /// + /// + /// Methods on the returned will iterate over the passed in in the order given. + /// + /// + /// will return the lowest value of all the . + /// + /// + /// s returned will aggregate metadata and for duplicates use the value of the first lease with the same metadata name. + /// + /// + /// The resource type that is being rate limited. + /// The s that will be called in order when acquiring resources. + /// + /// is a null parameter. + /// is an empty array. + public static PartitionedRateLimiter CreateChained( + params PartitionedRateLimiter[] limiters) { - lock (Lock) + if (limiters is null) { - if (_disposed) - { - return; - } - - // If the cache has been invalidated we need to recreate it - if (_cacheInvalid) - { - _cachedLimiters.Clear(); - _cachedLimiters.AddRange(_limiters); - } - } - - List? aggregateExceptions = null; - - // cachedLimiters is safe to use outside the lock because it is only updated by the Timer - foreach (KeyValuePair> rateLimiter in _cachedLimiters) - { - if (!rateLimiter.Value.IsValueCreated) - { - continue; - } - if (rateLimiter.Value.Value.IdleDuration is TimeSpan idleDuration && idleDuration > _idleTimeLimit) - { - lock (Lock) - { - // Check time again under lock to make sure no one calls Acquire or WaitAsync after checking the time and removing the limiter - idleDuration = rateLimiter.Value.Value.IdleDuration ?? TimeSpan.Zero; - if (idleDuration > _idleTimeLimit) - { - // Remove limiter from the lookup table and mark cache as invalid - // If a request for this partition comes in it will have to create a new limiter now - // And the next time the timer runs the cache needs to be updated to no longer have a reference to this limiter - _cacheInvalid = true; - _limiters.Remove(rateLimiter.Key); - - // We don't want to dispose inside the lock so we need to defer it - _limitersToDispose.Add(rateLimiter.Value.Value); - } - } - } - else if (rateLimiter.Value.Value is ReplenishingRateLimiter replenishingRateLimiter) - { - try - { - replenishingRateLimiter.TryReplenish(); - } - catch (Exception ex) - { - aggregateExceptions ??= new List(); - aggregateExceptions.Add(ex); - } - } + throw new ArgumentNullException(nameof(limiters)); } - - foreach (RateLimiter limiter in _limitersToDispose) - { - try - { - await limiter.DisposeAsync().ConfigureAwait(false); - } - catch (Exception ex) - { - aggregateExceptions ??= new List(); - aggregateExceptions.Add(ex); - } - } - _limitersToDispose.Clear(); - - if (aggregateExceptions is not null) + if (limiters.Length == 0) { - throw new AggregateException(aggregateExceptions); + throw new ArgumentException("Must pass in at least 1 limiter.", nameof(limiters)); } + return new ChainedPartitionedRateLimiter(limiters); } } } diff --git a/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs new file mode 100644 index 0000000000000..d16b5b525acaa --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/tests/ChainedLimiterTests.cs @@ -0,0 +1,1085 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.RateLimiting.Tests +{ + public class ChainedLimiterTests + { + [Fact] + public void ThrowsWhenNoLimitersProvided() + { + Assert.Throws(() => PartitionedRateLimiter.CreateChained()); + Assert.Throws(() => PartitionedRateLimiter.CreateChained(new PartitionedRateLimiter[0])); + } + + [Fact] + public void ThrowsWhenNullPassedIn() + { + Assert.Throws(() => PartitionedRateLimiter.CreateChained(null)); + } + + [Fact] + public async Task DisposeMakesMethodsThrow() + { + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 0)); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 0)); + }); + var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + chainedLimiter.Dispose(); + + Assert.Throws(() => chainedLimiter.GetAvailablePermits("")); + Assert.Throws(() => chainedLimiter.Acquire("")); + await Assert.ThrowsAsync(async () => await chainedLimiter.WaitAsync("")); + } + + [Fact] + public async Task DisposeAsyncMakesMethodsThrow() + { + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 0)); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 0)); + }); + var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + await chainedLimiter.DisposeAsync(); + + Assert.Throws(() => chainedLimiter.GetAvailablePermits("")); + Assert.Throws(() => chainedLimiter.Acquire("")); + await Assert.ThrowsAsync(async () => await chainedLimiter.WaitAsync("")); + } + + [Fact] + public void AvailablePermitsReturnsLowestValue() + { + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, _ => new ConcurrencyLimiterOptions(34, QueueProcessingOrder.NewestFirst, 0)); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, _ => new ConcurrencyLimiterOptions(22, QueueProcessingOrder.NewestFirst, 0)); + }); + using var limiter3 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, _ => new ConcurrencyLimiterOptions(13, QueueProcessingOrder.NewestFirst, 0)); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2, limiter3); + Assert.Equal(13, chainedLimiter.GetAvailablePermits("")); + } + + [Fact] + public void AvailablePermitsWithSingleLimiterWorks() + { + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, _ => new ConcurrencyLimiterOptions(34, QueueProcessingOrder.NewestFirst, 0)); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter); + Assert.Equal(34, chainedLimiter.GetAvailablePermits("")); + } + + [Fact] + public void AcquireWorksWithSingleLimiter() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter); + using var lease = chainedLimiter.Acquire(""); + + Assert.True(lease.IsAcquired); + Assert.Single(limiterFactory.Limiters); + Assert.Equal(1, limiterFactory.Limiters[0].Key); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.AcquireCallCount); + } + + [Fact] + public async Task WaitAsyncWorksWithSingleLimiter() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter); + using var lease = await chainedLimiter.WaitAsync(""); + + Assert.True(lease.IsAcquired); + Assert.Single(limiterFactory.Limiters); + Assert.Equal(1, limiterFactory.Limiters[0].Key); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.WaitAsyncCallCount); + } + + [Fact] + public void AcquireWorksWithMultipleLimiters() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key)); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + using var lease = chainedLimiter.Acquire(""); + + Assert.True(lease.IsAcquired); + Assert.Equal(2, limiterFactory.Limiters.Count); + Assert.Equal(1, limiterFactory.Limiters[0].Key); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.AcquireCallCount); + Assert.Equal(2, limiterFactory.Limiters[1].Key); + Assert.Equal(1, limiterFactory.Limiters[1].Limiter.AcquireCallCount); + } + + [Fact] + public async Task WaitAsyncWorksWithMultipleLimiters() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key)); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + using var lease = await chainedLimiter.WaitAsync(""); + + Assert.True(lease.IsAcquired); + Assert.Equal(2, limiterFactory.Limiters.Count); + Assert.Equal(1, limiterFactory.Limiters[0].Key); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.WaitAsyncCallCount); + Assert.Equal(2, limiterFactory.Limiters[1].Key); + Assert.Equal(1, limiterFactory.Limiters[1].Limiter.WaitAsyncCallCount); + } + + [Fact] + public void AcquireLeaseCorrectlyDisposesWithMultipleLimiters() + { + var concurrencyLimiter1 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + var concurrencyLimiter2 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter1); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(2, key => concurrencyLimiter2); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + var lease = chainedLimiter.Acquire(""); + + Assert.True(lease.IsAcquired); + Assert.Equal(0, concurrencyLimiter1.GetAvailablePermits()); + Assert.Equal(0, concurrencyLimiter2.GetAvailablePermits()); + + lease.Dispose(); + Assert.Equal(1, concurrencyLimiter1.GetAvailablePermits()); + Assert.Equal(1, concurrencyLimiter2.GetAvailablePermits()); + } + + [Fact] + public async Task WaitAsyncLeaseCorrectlyDisposesWithMultipleLimiters() + { + var concurrencyLimiter1 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + var concurrencyLimiter2 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter1); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(2, key => concurrencyLimiter2); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + var lease = await chainedLimiter.WaitAsync(""); + + Assert.True(lease.IsAcquired); + Assert.Equal(0, concurrencyLimiter1.GetAvailablePermits()); + Assert.Equal(0, concurrencyLimiter2.GetAvailablePermits()); + + lease.Dispose(); + Assert.Equal(1, concurrencyLimiter1.GetAvailablePermits()); + Assert.Equal(1, concurrencyLimiter2.GetAvailablePermits()); + } + + [Fact] + public void AcquireLeaseCorrectlyDisposesWithSingleLimiter() + { + var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter); + var lease = chainedLimiter.Acquire(""); + + Assert.True(lease.IsAcquired); + Assert.Equal(0, concurrencyLimiter.GetAvailablePermits()); + + lease.Dispose(); + Assert.Equal(1, concurrencyLimiter.GetAvailablePermits()); + } + + [Fact] + public async Task WaitAsyncLeaseCorrectlyDisposesWithSingleLimiter() + { + var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter); + var lease = await chainedLimiter.WaitAsync(""); + + Assert.True(lease.IsAcquired); + Assert.Equal(0, concurrencyLimiter.GetAvailablePermits()); + + lease.Dispose(); + Assert.Equal(1, concurrencyLimiter.GetAvailablePermits()); + } + + [Fact] + public void AcquireFailsWhenOneLimiterDoesNotHaveEnoughResources() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(2, key => concurrencyLimiter); + }); + + // Acquire the only permit on the ConcurrencyLimiter so the chained limiter fails when calling acquire + var concurrencyLease = concurrencyLimiter.Acquire(); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + using var lease = chainedLimiter.Acquire(""); + + Assert.False(lease.IsAcquired); + Assert.Single(limiterFactory.Limiters); + Assert.Equal(1, limiterFactory.Limiters[0].Key); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.AcquireCallCount); + } + + [Fact] + public async Task WaitAsyncFailsWhenOneLimiterDoesNotHaveEnoughResources() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(2, key => concurrencyLimiter); + }); + + // Acquire the only permit on the ConcurrencyLimiter so the chained limiter fails when calling acquire + var concurrencyLease = await concurrencyLimiter.WaitAsync(); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + using var lease = chainedLimiter.Acquire(""); + + Assert.False(lease.IsAcquired); + Assert.Single(limiterFactory.Limiters); + Assert.Equal(1, limiterFactory.Limiters[0].Key); + Assert.Equal(1, limiterFactory.Limiters[0].Limiter.AcquireCallCount); + } + + [Fact] + public void AcquireFailsAndReleasesAcquiredResources() + { + using var concurrencyLimiter1 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var concurrencyLimiter2 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter1); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(2, key => concurrencyLimiter2); + }); + + // Acquire the only permit on the ConcurrencyLimiter so the chained limiter fails when calling acquire + var concurrencyLease = concurrencyLimiter2.Acquire(); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + using var lease = chainedLimiter.Acquire(""); + + Assert.False(lease.IsAcquired); + Assert.Equal(1, concurrencyLimiter1.GetAvailablePermits()); + } + + [Fact] + public async Task WaitAsyncFailsAndReleasesAcquiredResources() + { + using var concurrencyLimiter1 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var concurrencyLimiter2 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter1); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(2, key => concurrencyLimiter2); + }); + + // Acquire the only permit on the ConcurrencyLimiter so the chained limiter fails when calling acquire + var concurrencyLease = await concurrencyLimiter2.WaitAsync(); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + using var lease = chainedLimiter.Acquire(""); + + Assert.False(lease.IsAcquired); + Assert.Equal(1, concurrencyLimiter1.GetAvailablePermits()); + } + + [Fact] + public void AcquireThrowsAndReleasesAcquiredResources() + { + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(2, key => new NotImplementedLimiter()); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + Assert.Throws(() => chainedLimiter.Acquire("")); + Assert.Equal(1, concurrencyLimiter.GetAvailablePermits()); + } + + [Fact] + public async Task WaitAsyncThrowsAndReleasesAcquiredResources() + { + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(2, key => new NotImplementedLimiter()); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + await Assert.ThrowsAsync(async () => await chainedLimiter.WaitAsync("")); + Assert.Equal(1, concurrencyLimiter.GetAvailablePermits()); + } + + [Fact] + public void AcquireThrows_SingleLimiter() + { + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new NotImplementedLimiter()); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1); + Assert.Throws(() => chainedLimiter.Acquire("")); + } + + [Fact] + public async Task WaitAsyncThrows_SingleLimiter() + { + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new NotImplementedLimiter()); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1); + await Assert.ThrowsAsync(async () => await chainedLimiter.WaitAsync("")); + } + + internal sealed class ThrowDisposeLease : RateLimitLease + { + public override bool IsAcquired => true; + + public override IEnumerable MetadataNames => throw new NotImplementedException(); + + public override bool TryGetMetadata(string metadataName, out object? metadata) => throw new NotImplementedException(); + + protected override void Dispose(bool disposing) => throw new NotImplementedException(); + } + + [Fact] + public void AcquireFailsDisposeThrows() + { + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { AcquireCoreImpl = _ => new ThrowDisposeLease() }); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter); + }); + + var lease = concurrencyLimiter.Acquire(); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + var ex = Assert.Throws(() => chainedLimiter.Acquire("")); + Assert.Single(ex.InnerExceptions); + Assert.IsType(ex.InnerException); + } + + [Fact] + public async Task WaitAsyncFailsDisposeThrows() + { + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { AcquireCoreImpl = _ => new ThrowDisposeLease() }); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter); + }); + + var lease = await concurrencyLimiter.WaitAsync(); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + var ex = Assert.Throws(() => chainedLimiter.Acquire("")); + Assert.Single(ex.InnerExceptions); + Assert.IsType(ex.InnerException); + } + + [Fact] + public void AcquireFailsDisposeThrowsMultipleLimitersThrow() + { + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { AcquireCoreImpl = _ => new ThrowDisposeLease() }); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { AcquireCoreImpl = _ => new ThrowDisposeLease() }); + }); + using var limiter3 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter); + }); + + var lease = concurrencyLimiter.Acquire(); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2, limiter3); + var ex = Assert.Throws(() => chainedLimiter.Acquire("")); + Assert.Equal(2, ex.InnerExceptions.Count); + Assert.IsType(ex.InnerExceptions[0]); + Assert.IsType(ex.InnerExceptions[1]); + } + + [Fact] + public async Task WaitAsyncFailsDisposeThrowsMultipleLimitersThrow() + { + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { WaitAsyncCoreImpl = (_, _) => new ValueTask(new ThrowDisposeLease()) }); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { WaitAsyncCoreImpl = (_, _) => new ValueTask(new ThrowDisposeLease()) }); + }); + using var limiter3 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter); + }); + + var lease = concurrencyLimiter.Acquire(); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2, limiter3); + var ex = await Assert.ThrowsAsync(async () => await chainedLimiter.WaitAsync("")); + Assert.Equal(2, ex.InnerExceptions.Count); + Assert.IsType(ex.InnerExceptions[0]); + Assert.IsType(ex.InnerExceptions[1]); + } + + [Fact] + public void AcquireThrowsDisposeThrowsMultipleLimitersThrow() + { + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { AcquireCoreImpl = _ => new ThrowDisposeLease() }); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { AcquireCoreImpl = _ => new ThrowDisposeLease() }); + }); + using var limiter3 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new NotImplementedLimiter()); + }); + + var lease = concurrencyLimiter.Acquire(); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2, limiter3); + var ex = Assert.Throws(() => chainedLimiter.Acquire("")); + Assert.Equal(3, ex.InnerExceptions.Count); + Assert.IsType(ex.InnerExceptions[0]); + Assert.IsType(ex.InnerExceptions[1]); + Assert.IsType(ex.InnerExceptions[2]); + } + + [Fact] + public async Task WaitAsyncThrowsDisposeThrowsMultipleLimitersThrow() + { + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { WaitAsyncCoreImpl = (_, _) => new ValueTask(new ThrowDisposeLease()) }); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { WaitAsyncCoreImpl = (_, _) => new ValueTask(new ThrowDisposeLease()) }); + }); + using var limiter3 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new NotImplementedLimiter()); + }); + + var lease = concurrencyLimiter.Acquire(); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2, limiter3); + var ex = await Assert.ThrowsAsync(async () => await chainedLimiter.WaitAsync("")); + Assert.Equal(3, ex.InnerExceptions.Count); + Assert.IsType(ex.InnerExceptions[0]); + Assert.IsType(ex.InnerExceptions[1]); + Assert.IsType(ex.InnerExceptions[2]); + } + + [Fact] + public void AcquireSucceedsDisposeThrowsAndReleasesResources() + { + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { AcquireCoreImpl = _ => new ThrowDisposeLease() }); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + var lease = chainedLimiter.Acquire(""); + Assert.True(lease.IsAcquired); + Assert.Equal(0, concurrencyLimiter.GetAvailablePermits()); + var ex = Assert.Throws(() => lease.Dispose()); + Assert.Single(ex.InnerExceptions); + Assert.IsType(ex.InnerException); + + Assert.Equal(1, concurrencyLimiter.GetAvailablePermits()); + } + + [Fact] + public async Task WaitAsyncSucceedsDisposeThrowsAndReleasesResources() + { + using var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new CustomizableLimiter() { WaitAsyncCoreImpl = (_, _) => new ValueTask(new ThrowDisposeLease()) }); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter); + }); + + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + var lease = await chainedLimiter.WaitAsync(""); + Assert.True(lease.IsAcquired); + Assert.Equal(0, concurrencyLimiter.GetAvailablePermits()); + var ex = Assert.Throws(() => lease.Dispose()); + Assert.Single(ex.InnerExceptions); + Assert.IsType(ex.InnerException); + + Assert.Equal(1, concurrencyLimiter.GetAvailablePermits()); + } + + [Fact] + public void AcquireForwardsCorrectPermitCount() + { + using var concurrencyLimiter1 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(5, QueueProcessingOrder.OldestFirst, 0)); + using var concurrencyLimiter2 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(3, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter1); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter2); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + var lease = chainedLimiter.Acquire("", 3); + Assert.True(lease.IsAcquired); + Assert.Equal(2, concurrencyLimiter1.GetAvailablePermits()); + Assert.Equal(0, concurrencyLimiter2.GetAvailablePermits()); + + lease.Dispose(); + Assert.Equal(5, concurrencyLimiter1.GetAvailablePermits()); + Assert.Equal(3, concurrencyLimiter2.GetAvailablePermits()); + } + + [Fact] + public async Task WaitAsyncForwardsCorrectPermitCount() + { + using var concurrencyLimiter1 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(5, QueueProcessingOrder.OldestFirst, 0)); + using var concurrencyLimiter2 = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(3, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter1); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter2); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + var lease = await chainedLimiter.WaitAsync("", 3); + Assert.True(lease.IsAcquired); + Assert.Equal(2, concurrencyLimiter1.GetAvailablePermits()); + Assert.Equal(0, concurrencyLimiter2.GetAvailablePermits()); + + lease.Dispose(); + Assert.Equal(5, concurrencyLimiter1.GetAvailablePermits()); + Assert.Equal(3, concurrencyLimiter2.GetAvailablePermits()); + } + + [Fact] + public void AcquireForwardsCorrectResourceID() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + if (resource == "1") + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + } + return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key)); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + if (resource == "1") + { + return RateLimitPartition.Create(3, key => limiterFactory.GetLimiter(key)); + } + return RateLimitPartition.Create(4, key => limiterFactory.GetLimiter(key)); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + var lease = chainedLimiter.Acquire("1"); + Assert.True(lease.IsAcquired); + Assert.Equal(2, limiterFactory.Limiters.Count); + Assert.Equal(1, limiterFactory.Limiters[0].Key); + Assert.Equal(3, limiterFactory.Limiters[1].Key); + } + + [Fact] + public async Task WaitAsyncForwardsCorrectResourceID() + { + var limiterFactory = new TrackingRateLimiterFactory(); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + if (resource == "1") + { + return RateLimitPartition.Create(1, key => limiterFactory.GetLimiter(key)); + } + return RateLimitPartition.Create(2, key => limiterFactory.GetLimiter(key)); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + if (resource == "1") + { + return RateLimitPartition.Create(3, key => limiterFactory.GetLimiter(key)); + } + return RateLimitPartition.Create(4, key => limiterFactory.GetLimiter(key)); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + var lease = await chainedLimiter.WaitAsync("1"); + Assert.True(lease.IsAcquired); + Assert.Equal(2, limiterFactory.Limiters.Count); + Assert.Equal(1, limiterFactory.Limiters[0].Key); + Assert.Equal(3, limiterFactory.Limiters[1].Key); + } + + [Fact] + public async Task WaitAsyncCanBeCanceled() + { + using var limiter = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, key => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1)); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter); + + var lease = chainedLimiter.Acquire(""); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + var task = chainedLimiter.WaitAsync("", 1, cts.Token); + + cts.Cancel(); + await Assert.ThrowsAsync(async () => await task); + } + + [Fact] + public async Task WaitAsyncCanceledReleasesAcquiredResources() + { + var concurrencyLimiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.OldestFirst, 0)); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => concurrencyLimiter); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, key => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1)); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + var lease = chainedLimiter.Acquire(""); + Assert.True(lease.IsAcquired); + Assert.Equal(1, concurrencyLimiter.GetAvailablePermits()); + + var cts = new CancellationTokenSource(); + var task = chainedLimiter.WaitAsync("", 1, cts.Token); + + Assert.Equal(0, concurrencyLimiter.GetAvailablePermits()); + cts.Cancel(); + await Assert.ThrowsAsync(async () => await task); + Assert.Equal(1, concurrencyLimiter.GetAvailablePermits()); + } + + [Fact] + public async Task WaitAsyncWaitsForResourcesBeforeCallingNextLimiter() + { + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.CreateConcurrencyLimiter(1, key => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1)); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + // 0 queue limit to verify this isn't called while the previous limiter is waiting for resource(s) + // as it would return a failed lease when no queue is available + return RateLimitPartition.CreateConcurrencyLimiter(1, key => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 0)); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + var lease = chainedLimiter.Acquire(""); + Assert.True(lease.IsAcquired); + + var task = chainedLimiter.WaitAsync(""); + Assert.False(task.IsCompleted); + + lease.Dispose(); + lease = await task; + Assert.True(lease.IsAcquired); + } + + [Fact] + public void LeasesAreDisposedInReverseOrder() + { + var customizableLimiter1 = new CustomizableLimiter(); + var customizableLimiter2 = new CustomizableLimiter(); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => customizableLimiter1); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => customizableLimiter2); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + var customizableLease1 = new CustomizableLease(); + var disposeCalled = false; + customizableLease1.DisposeImpl = _ => + { + Assert.True(disposeCalled); + }; + customizableLimiter1.AcquireCoreImpl = _ => customizableLease1; + + var customizableLease2 = new CustomizableLease(); + customizableLease2.DisposeImpl = _ => + { + disposeCalled = true; + }; + customizableLimiter2.AcquireCoreImpl = _ => customizableLease2; + + var lease = chainedLimiter.Acquire(""); + Assert.True(lease.IsAcquired); + + lease.Dispose(); + } + + [Fact] + public void LeasesAreDisposedInReverseOrderWhenAcquireThrows() + { + var customizableLimiter1 = new CustomizableLimiter(); + var customizableLimiter2 = new CustomizableLimiter(); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => customizableLimiter1); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => customizableLimiter2); + }); + using var limiter3 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new NotImplementedLimiter()); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2, limiter3); + + var customizableLease1 = new CustomizableLease(); + var disposeCalled = false; + customizableLease1.DisposeImpl = _ => + { + Assert.True(disposeCalled); + }; + customizableLimiter1.AcquireCoreImpl = _ => customizableLease1; + + var customizableLease2 = new CustomizableLease(); + customizableLease2.DisposeImpl = _ => + { + disposeCalled = true; + }; + customizableLimiter2.AcquireCoreImpl = _ => customizableLease2; + + Assert.Throws(() => chainedLimiter.Acquire("")); + } + + [Fact] + public async Task LeasesAreDisposedInReverseOrderWhenWaitAsyncThrows() + { + var customizableLimiter1 = new CustomizableLimiter(); + var customizableLimiter2 = new CustomizableLimiter(); + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => customizableLimiter1); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => customizableLimiter2); + }); + using var limiter3 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => new NotImplementedLimiter()); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2, limiter3); + + var customizableLease1 = new CustomizableLease(); + var disposeCalled = false; + customizableLease1.DisposeImpl = _ => + { + Assert.True(disposeCalled); + }; + customizableLimiter1.WaitAsyncCoreImpl = (_, _) => new ValueTask(customizableLease1); + + var customizableLease2 = new CustomizableLease(); + customizableLease2.DisposeImpl = _ => + { + disposeCalled = true; + }; + customizableLimiter2.WaitAsyncCoreImpl = (_, _) => new ValueTask(customizableLease2); + + await Assert.ThrowsAsync(async () => await chainedLimiter.WaitAsync("")); + } + + [Fact] + public void MetadataIsCombined() + { + var customizableLimiter1 = new CustomizableLimiter(); + customizableLimiter1.AcquireCoreImpl = _ => new CustomizableLease() + { + MetadataNamesImpl = () => + { + return new[] { "1", "2" }; + }, + TryGetMetadataImpl = (string name, out object? metadata) => + { + if (name == "1") + { + metadata = new DateTime(); + return true; + } + if (name == "2") + { + metadata = new TimeSpan(); + return true; + } + metadata = null; + return false; + } + }; + var customizableLimiter2 = new CustomizableLimiter(); + customizableLimiter2.AcquireCoreImpl = _ => new CustomizableLease() + { + MetadataNamesImpl = () => + { + return new[] { "3", "4" }; + }, + TryGetMetadataImpl = (string name, out object? metadata) => + { + if (name == "3") + { + metadata = new Exception(); + return true; + } + if (name == "4") + { + metadata = new List(); + return true; + } + metadata = null; + return false; + } + }; + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => customizableLimiter1); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => customizableLimiter2); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + var lease = chainedLimiter.Acquire(""); + + var metaDataNames = lease.MetadataNames.ToArray(); + Assert.Equal(4, metaDataNames.Length); + Assert.Contains("1", metaDataNames); + Assert.Contains("2", metaDataNames); + Assert.Contains("3", metaDataNames); + Assert.Contains("4", metaDataNames); + + Assert.True(lease.TryGetMetadata("1", out var obj)); + Assert.IsType(obj); + Assert.True(lease.TryGetMetadata("2", out obj)); + Assert.IsType(obj); + Assert.True(lease.TryGetMetadata("3", out obj)); + Assert.IsType(obj); + Assert.True(lease.TryGetMetadata("4", out obj)); + Assert.IsType>(obj); + } + + [Fact] + public void DuplicateMetadataUsesFirstOne() + { + var customizableLimiter1 = new CustomizableLimiter(); + customizableLimiter1.AcquireCoreImpl = _ => new CustomizableLease() + { + MetadataNamesImpl = () => + { + return new[] { "1", "2" }; + }, + TryGetMetadataImpl = (string name, out object? metadata) => + { + if (name == "1") + { + metadata = new DateTime(); + return true; + } + if (name == "2") + { + metadata = new TimeSpan(); + return true; + } + metadata = null; + return false; + } + }; + var customizableLimiter2 = new CustomizableLimiter(); + customizableLimiter2.AcquireCoreImpl = _ => new CustomizableLease() + { + MetadataNamesImpl = () => + { + return new[] { "1", "3" }; + }, + TryGetMetadataImpl = (string name, out object? metadata) => + { + // duplicate metadata name, previous one will win + if (name == "1") + { + metadata = new Exception(); + return true; + } + if (name == "3") + { + metadata = new List(); + return true; + } + metadata = null; + return false; + } + }; + using var limiter1 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => customizableLimiter1); + }); + using var limiter2 = PartitionedRateLimiter.Create(resource => + { + return RateLimitPartition.Create(1, key => customizableLimiter2); + }); + using var chainedLimiter = PartitionedRateLimiter.CreateChained(limiter1, limiter2); + + var lease = chainedLimiter.Acquire(""); + + var metadataNames = lease.MetadataNames.ToArray(); + Assert.Equal(3, metadataNames.Length); + Assert.Contains("1", metadataNames); + Assert.Contains("2", metadataNames); + Assert.Contains("3", metadataNames); + + Assert.True(lease.TryGetMetadata("1", out var obj)); + Assert.IsType(obj); + Assert.True(lease.TryGetMetadata("2", out obj)); + Assert.IsType(obj); + Assert.True(lease.TryGetMetadata("3", out obj)); + Assert.IsType>(obj); + } + } +} diff --git a/src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs b/src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs new file mode 100644 index 0000000000000..f82d5ec3987a5 --- /dev/null +++ b/src/libraries/System.Threading.RateLimiting/tests/Infrastructure/Utils.cs @@ -0,0 +1,228 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Xunit; + +namespace System.Threading.RateLimiting.Tests +{ + internal static class Utils + { + internal static Func StopTimerAndGetTimerFunc(PartitionedRateLimiter limiter) + { + var innerTimer = limiter.GetType().GetField("_timer", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance); + Assert.NotNull(innerTimer); + var timerStopMethod = innerTimer.FieldType.GetMethod("Stop"); + Assert.NotNull(timerStopMethod); + // Stop the current Timer so it doesn't fire unexpectedly + timerStopMethod.Invoke(innerTimer.GetValue(limiter), Array.Empty()); + + // Create a new Timer object so that disposing the PartitionedRateLimiter doesn't fail with an ODE, but this new Timer wont actually do anything + var timerCtor = innerTimer.FieldType.GetConstructor(new Type[] { typeof(TimeSpan), typeof(TimeSpan) }); + Assert.NotNull(timerCtor); + var newTimer = timerCtor.Invoke(new object[] { TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(10) }); + Assert.NotNull(newTimer); + innerTimer.SetValue(limiter, newTimer); + + var timerLoopMethod = limiter.GetType().GetMethod("Heartbeat", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance); + Assert.NotNull(timerLoopMethod); + return () => (Task)timerLoopMethod.Invoke(limiter, Array.Empty()); + } + } + + internal sealed class NotImplementedPartitionedRateLimiter : PartitionedRateLimiter + { + public override int GetAvailablePermits(T resourceID) => throw new NotImplementedException(); + protected override RateLimitLease AcquireCore(T resourceID, int permitCount) => throw new NotImplementedException(); + protected override ValueTask WaitAsyncCore(T resourceID, int permitCount, CancellationToken cancellationToken) => throw new NotImplementedException(); + } + + internal sealed class TrackingRateLimiter : RateLimiter + { + private int _getAvailablePermitsCallCount; + private int _acquireCallCount; + private int _waitAsyncCallCount; + private int _disposeCallCount; + private int _disposeAsyncCallCount; + + public int GetAvailablePermitsCallCount => _getAvailablePermitsCallCount; + public int AcquireCallCount => _acquireCallCount; + public int WaitAsyncCallCount => _waitAsyncCallCount; + public int DisposeCallCount => _disposeCallCount; + public int DisposeAsyncCallCount => _disposeAsyncCallCount; + + public override TimeSpan? IdleDuration => null; + + public override int GetAvailablePermits() + { + Interlocked.Increment(ref _getAvailablePermitsCallCount); + return 1; + } + + protected override RateLimitLease AcquireCore(int permitCount) + { + Interlocked.Increment(ref _acquireCallCount); + return new Lease(); + } + + protected override ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken) + { + Interlocked.Increment(ref _waitAsyncCallCount); + return new ValueTask(new Lease()); + } + + protected override void Dispose(bool disposing) + { + Interlocked.Increment(ref _disposeCallCount); + } + + protected override ValueTask DisposeAsyncCore() + { + Interlocked.Increment(ref _disposeAsyncCallCount); + return new ValueTask(); + } + + private sealed class Lease : RateLimitLease + { + public override bool IsAcquired => true; + + public override IEnumerable MetadataNames => throw new NotImplementedException(); + + public override bool TryGetMetadata(string metadataName, out object? metadata) => throw new NotImplementedException(); + } + } + + internal sealed class TrackingRateLimiterFactory + { + public List<(TKey Key, TrackingRateLimiter Limiter)> Limiters { get; } = new(); + + public RateLimiter GetLimiter(TKey key) + { + TrackingRateLimiter limiter; + lock (Limiters) + { + limiter = new TrackingRateLimiter(); + Limiters.Add((key, limiter)); + } + return limiter; + } + } + + internal sealed class TestEquality : IEqualityComparer + { + private int _equalsCallCount; + private int _getHashCodeCallCount; + + public int EqualsCallCount => _equalsCallCount; + public int GetHashCodeCallCount => _getHashCodeCallCount; + + public bool Equals(int x, int y) + { + Interlocked.Increment(ref _equalsCallCount); + return x == y; + } + public int GetHashCode([DisallowNull] int obj) + { + Interlocked.Increment(ref _getHashCodeCallCount); + return obj.GetHashCode(); + } + } + + internal sealed class NotImplementedLimiter : RateLimiter + { + public override TimeSpan? IdleDuration => throw new NotImplementedException(); + + public override int GetAvailablePermits() => throw new NotImplementedException(); + protected override RateLimitLease AcquireCore(int permitCount) => throw new NotImplementedException(); + protected override ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken) => throw new NotImplementedException(); + } + + internal sealed class CustomizableLimiter : RateLimiter + { + public Func IdleDurationImpl { get; set; } = () => null; + public override TimeSpan? IdleDuration => IdleDurationImpl(); + + public Func GetAvailablePermitsImpl { get; set; } = () => throw new NotImplementedException(); + public override int GetAvailablePermits() => GetAvailablePermitsImpl(); + + public Func AcquireCoreImpl { get; set; } = _ => new Lease(); + protected override RateLimitLease AcquireCore(int permitCount) => AcquireCoreImpl(permitCount); + + public Func> WaitAsyncCoreImpl { get; set; } = (_, _) => new ValueTask(new Lease()); + protected override ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken) => WaitAsyncCoreImpl(permitCount, cancellationToken); + + public Action DisposeImpl { get; set; } = _ => { }; + protected override void Dispose(bool disposing) => DisposeImpl(disposing); + + public Func DisposeAsyncCoreImpl { get; set; } = () => default; + protected override ValueTask DisposeAsyncCore() => DisposeAsyncCoreImpl(); + + private sealed class Lease : RateLimitLease + { + public override bool IsAcquired => true; + + public override IEnumerable MetadataNames => throw new NotImplementedException(); + + public override bool TryGetMetadata(string metadataName, out object? metadata) => throw new NotImplementedException(); + } + } + + internal sealed class CustomizableReplenishingLimiter : ReplenishingRateLimiter + { + public Func IdleDurationImpl { get; set; } = () => null; + public override TimeSpan? IdleDuration => IdleDurationImpl(); + + public Func GetAvailablePermitsImpl { get; set; } = () => throw new NotImplementedException(); + public override int GetAvailablePermits() => GetAvailablePermitsImpl(); + + public Func AcquireCoreImpl { get; set; } = _ => new Lease(); + protected override RateLimitLease AcquireCore(int permitCount) => AcquireCoreImpl(permitCount); + + public Func> WaitAsyncCoreImpl { get; set; } = (_, _) => new ValueTask(new Lease()); + protected override ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken) => WaitAsyncCoreImpl(permitCount, cancellationToken); + + public Func DisposeAsyncCoreImpl { get; set; } = () => default; + protected override ValueTask DisposeAsyncCore() => DisposeAsyncCoreImpl(); + + public override bool IsAutoReplenishing => false; + + public override TimeSpan ReplenishmentPeriod => throw new NotImplementedException(); + + public Func TryReplenishImpl { get; set; } = () => true; + public override bool TryReplenish() => TryReplenishImpl(); + + private sealed class Lease : RateLimitLease + { + public override bool IsAcquired => true; + + public override IEnumerable MetadataNames => throw new NotImplementedException(); + + public override bool TryGetMetadata(string metadataName, out object? metadata) => throw new NotImplementedException(); + } + } + + internal sealed class CustomizableLease : RateLimitLease + { + public Func IsAcquiredImpl = () => true; + public override bool IsAcquired => IsAcquiredImpl(); + + public Func> MetadataNamesImpl = () => Enumerable.Empty(); + public override IEnumerable MetadataNames => MetadataNamesImpl(); + + public delegate bool TryGetMetadataDelegate(string metadataName, out object? metadata); + public TryGetMetadataDelegate TryGetMetadataImpl = (string name, out object? metadata) => + { + metadata = null; + return false; + }; + public override bool TryGetMetadata(string metadataName, out object? metadata) => TryGetMetadataImpl(metadataName, out metadata); + + public Action DisposeImpl = _ => { }; + protected override void Dispose(bool disposing) => DisposeImpl(disposing); + } +} diff --git a/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs index 2818876d203d5..09318de2cf8bd 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/PartitionedRateLimiterTests.cs @@ -462,7 +462,7 @@ public async Task IdleLimiterIsCleanedUp() }); }); - var timerLoopMethod = StopTimerAndGetTimerFunc(limiter); + var timerLoopMethod = Utils.StopTimerAndGetTimerFunc(limiter); var lease = limiter.Acquire(""); Assert.True(lease.IsAcquired); @@ -514,7 +514,7 @@ public async Task AllIdleLimitersCleanedUp_DisposeThrows() } }); - var timerLoopMethod = StopTimerAndGetTimerFunc(limiter); + var timerLoopMethod = Utils.StopTimerAndGetTimerFunc(limiter); var lease = limiter.Acquire("1"); Assert.True(lease.IsAcquired); @@ -569,7 +569,7 @@ public async Task ThrowingTryReplenishDoesNotPreventIdleLimiterBeingCleanedUp() }); }); - var timerLoopMethod = StopTimerAndGetTimerFunc(limiter); + var timerLoopMethod = Utils.StopTimerAndGetTimerFunc(limiter); // Add the replenishing limiter to the internal storage limiter.Acquire("2"); @@ -595,188 +595,5 @@ public async Task ThrowingTryReplenishDoesNotPreventIdleLimiterBeingCleanedUp() // Wait for Timer to run again which will see the throwing TryReplenish and an idle limiter it needs to clean-up await disposeTcs.Task; } - - internal sealed class NotImplementedPartitionedRateLimiter : PartitionedRateLimiter - { - public override int GetAvailablePermits(T resourceID) => throw new NotImplementedException(); - protected override RateLimitLease AcquireCore(T resourceID, int permitCount) => throw new NotImplementedException(); - protected override ValueTask WaitAsyncCore(T resourceID, int permitCount, CancellationToken cancellationToken) => throw new NotImplementedException(); - } - - internal sealed class TrackingRateLimiter : RateLimiter - { - private int _getAvailablePermitsCallCount; - private int _acquireCallCount; - private int _waitAsyncCallCount; - private int _disposeCallCount; - private int _disposeAsyncCallCount; - - public int GetAvailablePermitsCallCount => _getAvailablePermitsCallCount; - public int AcquireCallCount => _acquireCallCount; - public int WaitAsyncCallCount => _waitAsyncCallCount; - public int DisposeCallCount => _disposeCallCount; - public int DisposeAsyncCallCount => _disposeAsyncCallCount; - - public override TimeSpan? IdleDuration => null; - - public override int GetAvailablePermits() - { - Interlocked.Increment(ref _getAvailablePermitsCallCount); - return 1; - } - - protected override RateLimitLease AcquireCore(int permitCount) - { - Interlocked.Increment(ref _acquireCallCount); - return new Lease(); - } - - protected override ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken) - { - Interlocked.Increment(ref _waitAsyncCallCount); - return new ValueTask(new Lease()); - } - - protected override void Dispose(bool disposing) - { - Interlocked.Increment(ref _disposeCallCount); - } - - protected override ValueTask DisposeAsyncCore() - { - Interlocked.Increment(ref _disposeAsyncCallCount); - return new ValueTask(); - } - - private sealed class Lease : RateLimitLease - { - public override bool IsAcquired => throw new NotImplementedException(); - - public override IEnumerable MetadataNames => throw new NotImplementedException(); - - public override bool TryGetMetadata(string metadataName, out object? metadata) => throw new NotImplementedException(); - } - } - - internal sealed class TrackingRateLimiterFactory - { - public List<(TKey Key, TrackingRateLimiter Limiter)> Limiters { get; } = new(); - - public RateLimiter GetLimiter(TKey key) - { - TrackingRateLimiter limiter; - lock (Limiters) - { - limiter = new TrackingRateLimiter(); - Limiters.Add((key, limiter)); - } - return limiter; - } - } - - internal sealed class TestEquality : IEqualityComparer - { - private int _equalsCallCount; - private int _getHashCodeCallCount; - - public int EqualsCallCount => _equalsCallCount; - public int GetHashCodeCallCount => _getHashCodeCallCount; - - public bool Equals(int x, int y) - { - Interlocked.Increment(ref _equalsCallCount); - return x == y; - } - public int GetHashCode([DisallowNull] int obj) - { - Interlocked.Increment(ref _getHashCodeCallCount); - return obj.GetHashCode(); - } - } - - internal sealed class CustomizableLimiter : RateLimiter - { - public Func IdleDurationImpl { get; set; } = () => null; - public override TimeSpan? IdleDuration => IdleDurationImpl(); - - public Func GetAvailablePermitsImpl { get; set; } = () => throw new NotImplementedException(); - public override int GetAvailablePermits() => GetAvailablePermitsImpl(); - - public Func AcquireCoreImpl { get; set; } = _ => new Lease(); - protected override RateLimitLease AcquireCore(int permitCount) => AcquireCoreImpl(permitCount); - - public Func> WaitAsyncCoreImpl { get; set; } = (_, _) => new ValueTask(new Lease()); - protected override ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken) => WaitAsyncCoreImpl(permitCount, cancellationToken); - - public Action DisposeImpl { get; set; } = _ => { }; - protected override void Dispose(bool disposing) => DisposeImpl(disposing); - - public Func DisposeAsyncCoreImpl { get; set; } = () => default; - protected override ValueTask DisposeAsyncCore() => DisposeAsyncCoreImpl(); - - private sealed class Lease : RateLimitLease - { - public override bool IsAcquired => true; - - public override IEnumerable MetadataNames => throw new NotImplementedException(); - - public override bool TryGetMetadata(string metadataName, out object? metadata) => throw new NotImplementedException(); - } - } - - internal sealed class CustomizableReplenishingLimiter : ReplenishingRateLimiter - { - public Func IdleDurationImpl { get; set; } = () => null; - public override TimeSpan? IdleDuration => IdleDurationImpl(); - - public Func GetAvailablePermitsImpl { get; set; } = () => throw new NotImplementedException(); - public override int GetAvailablePermits() => GetAvailablePermitsImpl(); - - public Func AcquireCoreImpl { get; set; } = _ => new Lease(); - protected override RateLimitLease AcquireCore(int permitCount) => AcquireCoreImpl(permitCount); - - public Func> WaitAsyncCoreImpl { get; set; } = (_, _) => new ValueTask(new Lease()); - protected override ValueTask WaitAsyncCore(int permitCount, CancellationToken cancellationToken) => WaitAsyncCoreImpl(permitCount, cancellationToken); - - public Func DisposeAsyncCoreImpl { get; set; } = () => default; - protected override ValueTask DisposeAsyncCore() => DisposeAsyncCoreImpl(); - - public override bool IsAutoReplenishing => false; - - public override TimeSpan ReplenishmentPeriod => throw new NotImplementedException(); - - public Func TryReplenishImpl { get; set; } = () => true; - public override bool TryReplenish() => TryReplenishImpl(); - - private sealed class Lease : RateLimitLease - { - public override bool IsAcquired => true; - - public override IEnumerable MetadataNames => throw new NotImplementedException(); - - public override bool TryGetMetadata(string metadataName, out object? metadata) => throw new NotImplementedException(); - } - } - - Func StopTimerAndGetTimerFunc(PartitionedRateLimiter limiter) - { - var innerTimer = limiter.GetType().GetField("_timer", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance); - Assert.NotNull(innerTimer); - var timerStopMethod = innerTimer.FieldType.GetMethod("Stop"); - Assert.NotNull(timerStopMethod); - // Stop the current Timer so it doesn't fire unexpectedly - timerStopMethod.Invoke(innerTimer.GetValue(limiter), Array.Empty()); - - // Create a new Timer object so that disposing the PartitionedRateLimiter doesn't fail with an ODE, but this new Timer wont actually do anything - var timerCtor = innerTimer.FieldType.GetConstructor(new Type[] { typeof(TimeSpan), typeof(TimeSpan) }); - Assert.NotNull(timerCtor); - var newTimer = timerCtor.Invoke(new object[] { TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(10) }); - Assert.NotNull(newTimer); - innerTimer.SetValue(limiter, newTimer); - - var timerLoopMethod = limiter.GetType().GetMethod("Heartbeat", Reflection.BindingFlags.NonPublic | Reflection.BindingFlags.Instance); - Assert.NotNull(timerLoopMethod); - return () => (Task)timerLoopMethod.Invoke(limiter, Array.Empty()); - } } } diff --git a/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj b/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj index ca8bf22f743f6..779279af740cd 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj +++ b/src/libraries/System.Threading.RateLimiting/tests/System.Threading.RateLimiting.Tests.csproj @@ -4,12 +4,14 @@ + +