From ba861dc70bd7e36edcfc40314128d8a531638953 Mon Sep 17 00:00:00 2001 From: Arthur Henrique Date: Mon, 2 May 2022 18:07:26 -0300 Subject: [PATCH] Use Structured Logging --- RedLockNet.SERedis/RedLock.cs | 1415 +++++++++++++++++---------------- 1 file changed, 710 insertions(+), 705 deletions(-) diff --git a/RedLockNet.SERedis/RedLock.cs b/RedLockNet.SERedis/RedLock.cs index dbc5609..b97676b 100644 --- a/RedLockNet.SERedis/RedLock.cs +++ b/RedLockNet.SERedis/RedLock.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; @@ -14,707 +14,712 @@ namespace RedLockNet.SERedis { - public class RedLock : IRedLock - { - private readonly object lockObject = new object(); - private readonly SemaphoreSlim extendUnlockSemaphore = new SemaphoreSlim(1, 1); - private readonly CancellationTokenSource unlockCancellationTokenSource = new CancellationTokenSource(); - - private readonly ICollection redisCaches; - private readonly ILogger logger; - - private readonly int quorum; - private readonly int quorumRetryCount; - private readonly int quorumRetryDelayMs; - private const double ClockDriftFactor = 0.01; - private static readonly long ClockPrecisionPaddingTicks = TimeSpan.FromMilliseconds(2).Ticks; - private bool isDisposed; - - private Timer lockKeepaliveTimer; - - private static readonly string UnlockScript = EmbeddedResourceLoader.GetEmbeddedResource("RedLockNet.SERedis.Lua.Unlock.lua"); - - // Set the expiry for the given key if its value matches the supplied value. - // Returns 1 on success, 0 on failure setting expiry or key not existing, -1 if the key value didn't match - private static readonly string ExtendIfMatchingValueScript = EmbeddedResourceLoader.GetEmbeddedResource("RedLockNet.SERedis.Lua.Extend.lua"); - - public string Resource { get; } - public string LockId { get; } - public bool IsAcquired => Status == RedLockStatus.Acquired; - public RedLockStatus Status { get; private set; } - public RedLockInstanceSummary InstanceSummary { get; private set; } - public int ExtendCount { get; private set; } - - private readonly TimeSpan expiryTime; - private readonly TimeSpan? waitTime; - private readonly TimeSpan? retryTime; - private CancellationToken cancellationToken; - - private static readonly TimeSpan MinimumExpiryTime = TimeSpan.FromMilliseconds(10); - private static readonly TimeSpan MinimumRetryTime = TimeSpan.FromMilliseconds(10); - - private const int DefaultQuorumRetryCount = 3; - private const int DefaultQuorumRetryDelayMs = 400; - - private RedLock( - ILogger logger, - ICollection redisCaches, - string resource, - TimeSpan expiryTime, - TimeSpan? waitTime = null, - TimeSpan? retryTime = null, - RedLockRetryConfiguration retryConfiguration = null, - CancellationToken? cancellationToken = null) - { - this.logger = logger; - - if (expiryTime < MinimumExpiryTime) - { - logger.LogWarning($"Expiry time {expiryTime.TotalMilliseconds}ms too low, setting to {MinimumExpiryTime.TotalMilliseconds}ms"); - expiryTime = MinimumExpiryTime; - } - - if (retryTime != null && retryTime.Value < MinimumRetryTime) - { - logger.LogWarning($"Retry time {retryTime.Value.TotalMilliseconds}ms too low, setting to {MinimumRetryTime.TotalMilliseconds}ms"); - retryTime = MinimumRetryTime; - } - - this.redisCaches = redisCaches; - - quorum = redisCaches.Count / 2 + 1; - quorumRetryCount = retryConfiguration?.RetryCount ?? DefaultQuorumRetryCount; - quorumRetryDelayMs = retryConfiguration?.RetryDelayMs ?? DefaultQuorumRetryDelayMs; - - Resource = resource; - LockId = Guid.NewGuid().ToString(); - this.expiryTime = expiryTime; - this.waitTime = waitTime; - this.retryTime = retryTime; - this.cancellationToken = cancellationToken ?? CancellationToken.None; - } - - internal static RedLock Create( - ILogger logger, - ICollection redisCaches, - string resource, - TimeSpan expiryTime, - TimeSpan? waitTime = null, - TimeSpan? retryTime = null, - RedLockRetryConfiguration retryConfiguration = null, - CancellationToken? cancellationToken = null) - { - var redisLock = new RedLock( - logger, - redisCaches, - resource, - expiryTime, - waitTime, - retryTime, - retryConfiguration, - cancellationToken); - - redisLock.Start(); - - return redisLock; - } - - internal static async Task CreateAsync( - ILogger logger, - ICollection redisCaches, - string resource, - TimeSpan expiryTime, - TimeSpan? waitTime = null, - TimeSpan? retryTime = null, - RedLockRetryConfiguration retryConfiguration = null, - CancellationToken? cancellationToken = null) - { - var redisLock = new RedLock( - logger, - redisCaches, - resource, - expiryTime, - waitTime, - retryTime, - retryConfiguration, - cancellationToken); - - await redisLock.StartAsync().ConfigureAwait(false); - - return redisLock; - } - - private void Start() - { - if (waitTime.HasValue && retryTime.HasValue && waitTime.Value.TotalMilliseconds > 0 && retryTime.Value.TotalMilliseconds > 0) - { - var stopwatch = Stopwatch.StartNew(); - - // ReSharper disable PossibleInvalidOperationException - while (!IsAcquired && stopwatch.Elapsed <= waitTime.Value) - { - (Status, InstanceSummary) = Acquire(); - - if (!IsAcquired) - { - TaskUtils.Delay(retryTime.Value, cancellationToken).Wait(cancellationToken); - } - } - // ReSharper restore PossibleInvalidOperationException - } - else - { - (Status, InstanceSummary) = Acquire(); - } - - logger.LogInformation($"Lock status: {Status} ({InstanceSummary}), {Resource} ({LockId})"); - - if (IsAcquired) - { - StartAutoExtendTimer(); - } - } - - private async Task StartAsync() - { - if (waitTime.HasValue && retryTime.HasValue && waitTime.Value.TotalMilliseconds > 0 && retryTime.Value.TotalMilliseconds > 0) - { - var stopwatch = Stopwatch.StartNew(); - - // ReSharper disable PossibleInvalidOperationException - while (!IsAcquired && stopwatch.Elapsed <= waitTime.Value) - { - (Status, InstanceSummary) = await AcquireAsync().ConfigureAwait(false); - - if (!IsAcquired) - { - await TaskUtils.Delay(retryTime.Value, cancellationToken).ConfigureAwait(false); - } - } - // ReSharper restore PossibleInvalidOperationException - } - else - { - (Status, InstanceSummary) = await AcquireAsync().ConfigureAwait(false); - } - - logger.LogInformation($"Lock status: {Status} ({InstanceSummary}), {Resource} ({LockId})"); - - if (IsAcquired) - { - StartAutoExtendTimer(); - } - } - - private (RedLockStatus, RedLockInstanceSummary) Acquire() - { - var lockSummary = new RedLockInstanceSummary(); - - for (var i = 0; i < quorumRetryCount; i++) - { - cancellationToken.ThrowIfCancellationRequested(); - - var iteration = i + 1; - logger.LogDebug($"Lock attempt {iteration}/{quorumRetryCount}: {Resource} ({LockId}), expiry: {expiryTime}"); - - var stopwatch = Stopwatch.StartNew(); - - lockSummary = Lock(); - - var validityTicks = GetRemainingValidityTicks(stopwatch); - - logger.LogDebug($"Acquired locks for {Resource} ({LockId}) in {lockSummary.Acquired}/{redisCaches.Count} instances, quorum: {quorum}, validityTicks: {validityTicks}"); - - if (lockSummary.Acquired >= quorum && validityTicks > 0) - { - return (RedLockStatus.Acquired, lockSummary); - } - - // we failed to get enough locks for a quorum, unlock everything and try again - Unlock(); - - // only sleep if we have more retries left - if (i < quorumRetryCount - 1) - { - var sleepMs = ThreadSafeRandom.Next(quorumRetryDelayMs); - - logger.LogDebug($"Sleeping {sleepMs}ms"); - - TaskUtils.Delay(sleepMs, cancellationToken).Wait(cancellationToken); - } - } - - var status = GetFailedRedLockStatus(lockSummary); - - // give up - logger.LogDebug($"Could not acquire quorum after {quorumRetryCount} attempts, giving up: {Resource} ({LockId}). {lockSummary}."); - - return (status, lockSummary); - } - - private async Task<(RedLockStatus, RedLockInstanceSummary)> AcquireAsync() - { - var lockSummary = new RedLockInstanceSummary(); - - for (var i = 0; i < quorumRetryCount; i++) - { - cancellationToken.ThrowIfCancellationRequested(); - - var iteration = i + 1; - logger.LogDebug($"Lock attempt {iteration}/{quorumRetryCount}: {Resource} ({LockId}), expiry: {expiryTime}"); - - var stopwatch = Stopwatch.StartNew(); - - lockSummary = await LockAsync().ConfigureAwait(false); - - var validityTicks = GetRemainingValidityTicks(stopwatch); - - logger.LogDebug($"Acquired locks for {Resource} ({LockId}) in {lockSummary.Acquired}/{redisCaches.Count} instances, quorum: {quorum}, validityTicks: {validityTicks}"); - - if (lockSummary.Acquired >= quorum && validityTicks > 0) - { - return (RedLockStatus.Acquired, lockSummary); - } - - // we failed to get enough locks for a quorum, unlock everything and try again - await UnlockAsync().ConfigureAwait(false); - - // only sleep if we have more retries left - if (i < quorumRetryCount - 1) - { - var sleepMs = ThreadSafeRandom.Next(quorumRetryDelayMs); - - logger.LogDebug($"Sleeping {sleepMs}ms"); - - await TaskUtils.Delay(sleepMs, cancellationToken).ConfigureAwait(false); - } - } - - var status = GetFailedRedLockStatus(lockSummary); - - // give up - logger.LogDebug($"Could not acquire quorum after {quorumRetryCount} attempts, giving up: {Resource} ({LockId}). {lockSummary}."); - - return (status, lockSummary); - } - - private void StartAutoExtendTimer() - { - var interval = expiryTime.TotalMilliseconds / 2; - - logger.LogDebug($"Starting auto extend timer with {interval}ms interval"); - - lockKeepaliveTimer = new Timer( - state => { ExtendLockLifetime(); }, - null, - (int) interval, - (int) interval); - } - - private void ExtendLockLifetime() - { - try - { - var gotSemaphore = extendUnlockSemaphore.Wait(0, unlockCancellationTokenSource.Token); - try - { - if (!gotSemaphore) - { - // another extend operation is still running, so skip this one - logger.LogWarning($"Lock renewal skipped due to another renewal still running: {Resource} ({LockId})"); - return; - } - - logger.LogTrace($"Lock renewal timer fired: {Resource} ({LockId})"); - - var stopwatch = Stopwatch.StartNew(); - - var extendSummary = Extend(); - - var validityTicks = GetRemainingValidityTicks(stopwatch); - - if (extendSummary.Acquired >= quorum && validityTicks > 0) - { - Status = RedLockStatus.Acquired; - InstanceSummary = extendSummary; - ExtendCount++; - - logger.LogDebug($"Extended lock, {Status} ({InstanceSummary}): {Resource} ({LockId})"); - } - else - { - Status = GetFailedRedLockStatus(extendSummary); - InstanceSummary = extendSummary; - - logger.LogWarning($"Failed to extend lock, {Status} ({InstanceSummary}): {Resource} ({LockId})"); - } - } - catch (Exception exception) - { - // All we can do here is log the exception and swallow it. - var message = $"Lock renewal timer thread failed: {Resource} ({LockId})"; - logger.LogError(null, exception, message); - } - finally - { - if (gotSemaphore) - { - extendUnlockSemaphore.Release(); - } - } - } - catch (OperationCanceledException) - { - // unlock has been called, don't extend - logger.LogDebug($"Lock renewal cancelled: {Resource} ({LockId})"); - } - } - - private long GetRemainingValidityTicks(Stopwatch sw) - { - // Add 2 milliseconds to the drift to account for Redis expires precision, - // which is 1 milliescond, plus 1 millisecond min drift for small TTLs. - var driftTicks = (long) (expiryTime.Ticks * ClockDriftFactor) + ClockPrecisionPaddingTicks; - var validityTicks = expiryTime.Ticks - sw.Elapsed.Ticks - driftTicks; - return validityTicks; - } - - private RedLockInstanceSummary Lock() - { - var lockResults = new ConcurrentBag(); - - Parallel.ForEach(redisCaches, cache => - { - lockResults.Add(LockInstance(cache)); - }); - - return PopulateRedLockResult(lockResults); - } - - private async Task LockAsync() - { - var lockTasks = redisCaches.Select(LockInstanceAsync); - - var lockResults = await TaskUtils.WhenAll(lockTasks).ConfigureAwait(false); - - return PopulateRedLockResult(lockResults); - } - - private RedLockInstanceSummary Extend() - { - var extendResults = new ConcurrentBag(); - - Parallel.ForEach(redisCaches, cache => - { - extendResults.Add(ExtendInstance(cache)); - }); - - return PopulateRedLockResult(extendResults); - } - - private void Unlock() - { - // ReSharper disable once MethodSupportsCancellation - extendUnlockSemaphore.Wait(); - try - { - Parallel.ForEach(redisCaches, UnlockInstance); - } - finally - { - extendUnlockSemaphore.Release(); - } - } - - private async Task UnlockAsync() - { - // ReSharper disable once MethodSupportsCancellation - await extendUnlockSemaphore.WaitAsync().ConfigureAwait(false); - try - { - var unlockTasks = redisCaches.Select(UnlockInstanceAsync); - - await TaskUtils.WhenAll(unlockTasks).ConfigureAwait(false); - } - finally - { - extendUnlockSemaphore.Release(); - } - } - - private RedLockInstanceResult LockInstance(RedisConnection cache) - { - var redisKey = GetRedisKey(cache.RedisKeyFormat, Resource); - var host = GetHost(cache.ConnectionMultiplexer); - - RedLockInstanceResult result; - - try - { - logger.LogTrace($"LockInstance enter {host}: {redisKey}, {LockId}, {expiryTime}"); - var redisResult = cache.ConnectionMultiplexer - .GetDatabase(cache.RedisDatabase) - .StringSet(redisKey, LockId, expiryTime, When.NotExists, CommandFlags.DemandMaster); - - result = redisResult ? RedLockInstanceResult.Success : RedLockInstanceResult.Conflicted; - } - catch (Exception ex) - { - logger.LogDebug($"Error locking lock instance {host}: {ex.Message}"); - - result = RedLockInstanceResult.Error; - } - - logger.LogTrace($"LockInstance exit {host}: {redisKey}, {LockId}, {result}"); - - return result; - } - - private async Task LockInstanceAsync(RedisConnection cache) - { - var redisKey = GetRedisKey(cache.RedisKeyFormat, Resource); - var host = GetHost(cache.ConnectionMultiplexer); - - RedLockInstanceResult result; - - try - { - logger.LogTrace($"LockInstanceAsync enter {host}: {redisKey}, {LockId}, {expiryTime}"); - var redisResult = await cache.ConnectionMultiplexer - .GetDatabase(cache.RedisDatabase) - .StringSetAsync(redisKey, LockId, expiryTime, When.NotExists, CommandFlags.DemandMaster) - .ConfigureAwait(false); - - result = redisResult ? RedLockInstanceResult.Success : RedLockInstanceResult.Conflicted; - } - catch (Exception ex) - { - logger.LogDebug($"Error locking lock instance {host}: {ex.Message}"); - - result = RedLockInstanceResult.Error; - } - - logger.LogTrace($"LockInstanceAsync exit {host}: {redisKey}, {LockId}, {result}"); - - return result; - } - - private RedLockInstanceResult ExtendInstance(RedisConnection cache) - { - var redisKey = GetRedisKey(cache.RedisKeyFormat, Resource); - var host = GetHost(cache.ConnectionMultiplexer); - - RedLockInstanceResult result; - - try - { - logger.LogTrace($"ExtendInstance enter {host}: {redisKey}, {LockId}, {expiryTime}"); - - // Returns 1 on success, 0 on failure setting expiry or key not existing, -1 if the key value didn't match - var extendResult = (long) cache.ConnectionMultiplexer - .GetDatabase(cache.RedisDatabase) - .ScriptEvaluate(ExtendIfMatchingValueScript, new RedisKey[] {redisKey}, new RedisValue[] {LockId, (long) expiryTime.TotalMilliseconds}, CommandFlags.DemandMaster); - - result = extendResult == 1 ? RedLockInstanceResult.Success - : extendResult == -1 ? RedLockInstanceResult.Conflicted - : RedLockInstanceResult.Error; - } - catch (Exception ex) - { - logger.LogDebug($"Error extending lock instance {host}: {ex.Message}"); - - result = RedLockInstanceResult.Error; - } - - logger.LogTrace($"ExtendInstance exit {host}: {redisKey}, {LockId}, {result}"); - - return result; - } - - private void UnlockInstance(RedisConnection cache) - { - var redisKey = GetRedisKey(cache.RedisKeyFormat, Resource); - var host = GetHost(cache.ConnectionMultiplexer); - - var result = false; - - try - { - logger.LogTrace($"UnlockInstance enter {host}: {redisKey}, {LockId}"); - result = (bool) cache.ConnectionMultiplexer - .GetDatabase(cache.RedisDatabase) - .ScriptEvaluate(UnlockScript, new RedisKey[] {redisKey}, new RedisValue[] {LockId}, CommandFlags.DemandMaster); - } - catch (Exception ex) - { - logger.LogDebug($"Error unlocking lock instance {host}: {ex.Message}"); - } - - logger.LogTrace($"UnlockInstance exit {host}: {redisKey}, {LockId}, {result}"); - } - - private async Task UnlockInstanceAsync(RedisConnection cache) - { - var redisKey = GetRedisKey(cache.RedisKeyFormat, Resource); - var host = GetHost(cache.ConnectionMultiplexer); - - var result = false; - - try - { - logger.LogTrace($"UnlockInstanceAsync enter {host}: {redisKey}, {LockId}"); - result = (bool) await cache.ConnectionMultiplexer - .GetDatabase(cache.RedisDatabase) - .ScriptEvaluateAsync(UnlockScript, new RedisKey[] { redisKey }, new RedisValue[] { LockId }, CommandFlags.DemandMaster) - .ConfigureAwait(false); - } - catch (Exception ex) - { - logger.LogDebug($"Error unlocking lock instance {host}: {ex.Message}"); - } - - logger.LogTrace($"UnlockInstanceAsync exit {host}: {redisKey}, {LockId}, {result}"); - - return result; - } - - private static string GetRedisKey(string redisKeyFormat, string resource) - { - return string.Format(redisKeyFormat, resource); - } - - internal static string GetHost(IConnectionMultiplexer cache) - { - var result = new StringBuilder(); - - foreach (var endPoint in cache.GetEndPoints()) - { - var server = cache.GetServer(endPoint); - - result.Append(server.EndPoint.GetFriendlyName()); - result.Append(" ("); - result.Append(server.IsSlave ? "slave" : "master"); - result.Append(server.IsConnected ? "" : ", disconnected"); - result.Append("), "); - } - - if (result.Length >= 2) - { - result.Remove(result.Length - 2, 2); - } - - return result.ToString(); - } - - public void Dispose() - { - Dispose(true); - } - - protected virtual void Dispose(bool disposing) - { - logger.LogDebug($"Disposing {Resource} ({LockId})"); - - if (isDisposed) - { - return; - } - - if (disposing) - { - StopKeepAliveTimer(); - } - - unlockCancellationTokenSource.Cancel(); - Unlock(); - - Status = RedLockStatus.Unlocked; - InstanceSummary = new RedLockInstanceSummary(); - - isDisposed = true; - } - - public ValueTask DisposeAsync() - { - return DisposeAsync(true); - } - - protected virtual async ValueTask DisposeAsync(bool disposing) - { - logger.LogDebug($"Disposing {Resource} ({LockId})"); - - if (isDisposed) - { - return; - } - - if (disposing) - { - StopKeepAliveTimer(); - } - - unlockCancellationTokenSource.Cancel(); - await UnlockAsync().ConfigureAwait(false); - - Status = RedLockStatus.Unlocked; - InstanceSummary = new RedLockInstanceSummary(); - - isDisposed = true; - } - - private RedLockStatus GetFailedRedLockStatus(RedLockInstanceSummary lockResult) - { - if (lockResult.Acquired >= quorum) - { - // if we got here with a quorum then validity must have expired - return RedLockStatus.Expired; - } - - if (lockResult.Acquired + lockResult.Conflicted >= quorum) - { - // we had enough instances for a quorum, but some were locked with another LockId - return RedLockStatus.Conflicted; - } - - return RedLockStatus.NoQuorum; - } - - private static RedLockInstanceSummary PopulateRedLockResult(IEnumerable instanceResults) - { - var acquired = 0; - var conflicted = 0; - var error = 0; - - foreach (var instanceResult in instanceResults) - { - switch (instanceResult) - { - case RedLockInstanceResult.Success: - acquired++; - break; - case RedLockInstanceResult.Conflicted: - conflicted++; - break; - case RedLockInstanceResult.Error: - error++; - break; - } - } - - return new RedLockInstanceSummary(acquired, conflicted, error); - } - - internal void StopKeepAliveTimer() - { - lock (lockObject) - { - if (lockKeepaliveTimer != null) - { - lockKeepaliveTimer.Change(Timeout.Infinite, Timeout.Infinite); - lockKeepaliveTimer.Dispose(); - lockKeepaliveTimer = null; - } - } - } - } -} \ No newline at end of file + public class RedLock : IRedLock + { + private readonly object lockObject = new object(); + private readonly SemaphoreSlim extendUnlockSemaphore = new SemaphoreSlim(1, 1); + private readonly CancellationTokenSource unlockCancellationTokenSource = new CancellationTokenSource(); + + private readonly ICollection redisCaches; + private readonly ILogger logger; + + private readonly int quorum; + private readonly int quorumRetryCount; + private readonly int quorumRetryDelayMs; + private const double ClockDriftFactor = 0.01; + private static readonly long ClockPrecisionPaddingTicks = TimeSpan.FromMilliseconds(2).Ticks; + private bool isDisposed; + + private Timer lockKeepaliveTimer; + + private static readonly string UnlockScript = EmbeddedResourceLoader.GetEmbeddedResource("RedLockNet.SERedis.Lua.Unlock.lua"); + + // Set the expiry for the given key if its value matches the supplied value. + // Returns 1 on success, 0 on failure setting expiry or key not existing, -1 if the key value didn't match + private static readonly string ExtendIfMatchingValueScript = EmbeddedResourceLoader.GetEmbeddedResource("RedLockNet.SERedis.Lua.Extend.lua"); + + public string Resource { get; } + public string LockId { get; } + public bool IsAcquired => Status == RedLockStatus.Acquired; + public RedLockStatus Status { get; private set; } + public RedLockInstanceSummary InstanceSummary { get; private set; } + public int ExtendCount { get; private set; } + + private readonly TimeSpan expiryTime; + private readonly TimeSpan? waitTime; + private readonly TimeSpan? retryTime; + private CancellationToken cancellationToken; + + private static readonly TimeSpan MinimumExpiryTime = TimeSpan.FromMilliseconds(10); + private static readonly TimeSpan MinimumRetryTime = TimeSpan.FromMilliseconds(10); + + private const int DefaultQuorumRetryCount = 3; + private const int DefaultQuorumRetryDelayMs = 400; + + private RedLock( + ILogger logger, + ICollection redisCaches, + string resource, + TimeSpan expiryTime, + TimeSpan? waitTime = null, + TimeSpan? retryTime = null, + RedLockRetryConfiguration retryConfiguration = null, + CancellationToken? cancellationToken = null) + { + this.logger = logger; + + if (expiryTime < MinimumExpiryTime) + { + logger.LogWarning("Expiry time {expiryTimeTotalMilliseconds}ms too low, setting to {minimumExpiryTimeTotalMilliseconds}ms", expiryTime.TotalMilliseconds, MinimumExpiryTime.TotalMilliseconds); + expiryTime = MinimumExpiryTime; + } + + if (retryTime != null && retryTime.Value < MinimumRetryTime) + { + logger.LogWarning("Retry time {retryTimeTotalMilliseconds}ms too low, setting to {minimumRetryTimeTotalMilliseconds}ms", retryTime.Value.TotalMilliseconds, MinimumRetryTime.TotalMilliseconds); + retryTime = MinimumRetryTime; + } + + this.redisCaches = redisCaches; + + quorum = redisCaches.Count / 2 + 1; + quorumRetryCount = retryConfiguration?.RetryCount ?? DefaultQuorumRetryCount; + quorumRetryDelayMs = retryConfiguration?.RetryDelayMs ?? DefaultQuorumRetryDelayMs; + + Resource = resource; + LockId = Guid.NewGuid().ToString(); + this.expiryTime = expiryTime; + this.waitTime = waitTime; + this.retryTime = retryTime; + this.cancellationToken = cancellationToken ?? CancellationToken.None; + } + + internal static RedLock Create( + ILogger logger, + ICollection redisCaches, + string resource, + TimeSpan expiryTime, + TimeSpan? waitTime = null, + TimeSpan? retryTime = null, + RedLockRetryConfiguration retryConfiguration = null, + CancellationToken? cancellationToken = null) + { + var redisLock = new RedLock( + logger, + redisCaches, + resource, + expiryTime, + waitTime, + retryTime, + retryConfiguration, + cancellationToken); + + redisLock.Start(); + + return redisLock; + } + + internal static async Task CreateAsync( + ILogger logger, + ICollection redisCaches, + string resource, + TimeSpan expiryTime, + TimeSpan? waitTime = null, + TimeSpan? retryTime = null, + RedLockRetryConfiguration retryConfiguration = null, + CancellationToken? cancellationToken = null) + { + var redisLock = new RedLock( + logger, + redisCaches, + resource, + expiryTime, + waitTime, + retryTime, + retryConfiguration, + cancellationToken); + + await redisLock.StartAsync().ConfigureAwait(false); + + return redisLock; + } + + private void Start() + { + if (waitTime.HasValue && retryTime.HasValue && waitTime.Value.TotalMilliseconds > 0 && retryTime.Value.TotalMilliseconds > 0) + { + var stopwatch = Stopwatch.StartNew(); + + // ReSharper disable PossibleInvalidOperationException + while (!IsAcquired && stopwatch.Elapsed <= waitTime.Value) + { + (Status, InstanceSummary) = Acquire(); + + if (!IsAcquired) + { + TaskUtils.Delay(retryTime.Value, cancellationToken).Wait(cancellationToken); + } + } + // ReSharper restore PossibleInvalidOperationException + } + else + { + (Status, InstanceSummary) = Acquire(); + } + + logger.LogInformation("Lock status: {Status} ({InstanceSummary}), {Resource} ({LockId})", Status, InstanceSummary, Resource, LockId); + + if (IsAcquired) + { + StartAutoExtendTimer(); + } + } + + private async Task StartAsync() + { + if (waitTime.HasValue && retryTime.HasValue && waitTime.Value.TotalMilliseconds > 0 && retryTime.Value.TotalMilliseconds > 0) + { + var stopwatch = Stopwatch.StartNew(); + + // ReSharper disable PossibleInvalidOperationException + while (!IsAcquired && stopwatch.Elapsed <= waitTime.Value) + { + (Status, InstanceSummary) = await AcquireAsync().ConfigureAwait(false); + + if (!IsAcquired) + { + await TaskUtils.Delay(retryTime.Value, cancellationToken).ConfigureAwait(false); + } + } + // ReSharper restore PossibleInvalidOperationException + } + else + { + (Status, InstanceSummary) = await AcquireAsync().ConfigureAwait(false); + } + + logger.LogInformation("Lock status: {Status} ({InstanceSummary}), {Resource} ({LockId})", Status, InstanceSummary, Resource, LockId); + + if (IsAcquired) + { + StartAutoExtendTimer(); + } + } + + private (RedLockStatus, RedLockInstanceSummary) Acquire() + { + var lockSummary = new RedLockInstanceSummary(); + + for (var i = 0; i < quorumRetryCount; i++) + { + cancellationToken.ThrowIfCancellationRequested(); + + var iteration = i + 1; + logger.LogDebug("Lock attempt {iteration}/{quorumRetryCount}: {Resource} ({LockId}), expiry: {expiryTime}", iteration, quorumRetryCount, Resource, LockId, expiryTime); + + var stopwatch = Stopwatch.StartNew(); + + lockSummary = Lock(); + + var validityTicks = GetRemainingValidityTicks(stopwatch); + + logger.LogDebug("Acquired locks for {Resource} ({LockId}) in {lockSummaryAcquired}/{redisCachesCount} instances, quorum: {quorum}, validityTicks: {validityTicks}", + Resource, LockId, lockSummary.Acquired, redisCaches.Count, quorum, validityTicks); + + if (lockSummary.Acquired >= quorum && validityTicks > 0) + { + return (RedLockStatus.Acquired, lockSummary); + } + + // we failed to get enough locks for a quorum, unlock everything and try again + Unlock(); + + // only sleep if we have more retries left + if (i < quorumRetryCount - 1) + { + var sleepMs = ThreadSafeRandom.Next(quorumRetryDelayMs); + + logger.LogDebug("Sleeping {sleepMs}ms", sleepMs); + + TaskUtils.Delay(sleepMs, cancellationToken).Wait(cancellationToken); + } + } + + var status = GetFailedRedLockStatus(lockSummary); + + // give up + logger.LogDebug("Could not acquire quorum after {quorumRetryCount} attempts, giving up: {Resource} ({LockId}). {lockSummary}.", + quorumRetryCount, Resource, LockId, lockSummary); + + return (status, lockSummary); + } + + private async Task<(RedLockStatus, RedLockInstanceSummary)> AcquireAsync() + { + var lockSummary = new RedLockInstanceSummary(); + + for (var i = 0; i < quorumRetryCount; i++) + { + cancellationToken.ThrowIfCancellationRequested(); + + var iteration = i + 1; + logger.LogDebug("Lock attempt {iteration}/{quorumRetryCount}: {Resource} ({LockId}), expiry: {expiryTime}", + iteration, quorumRetryCount, Resource, LockId, expiryTime); + + var stopwatch = Stopwatch.StartNew(); + + lockSummary = await LockAsync().ConfigureAwait(false); + + var validityTicks = GetRemainingValidityTicks(stopwatch); + + logger.LogDebug("Acquired locks for {Resource} ({LockId}) in {lockSummaryAcquired}/{redisCachesCount} instances, quorum: {quorum}, validityTicks: {validityTicks}", + Resource, LockId, lockSummary.Acquired, redisCaches.Count, quorum, validityTicks); + + if (lockSummary.Acquired >= quorum && validityTicks > 0) + { + return (RedLockStatus.Acquired, lockSummary); + } + + // we failed to get enough locks for a quorum, unlock everything and try again + await UnlockAsync().ConfigureAwait(false); + + // only sleep if we have more retries left + if (i < quorumRetryCount - 1) + { + var sleepMs = ThreadSafeRandom.Next(quorumRetryDelayMs); + + logger.LogDebug("Sleeping {sleepMs}ms", sleepMs); + + await TaskUtils.Delay(sleepMs, cancellationToken).ConfigureAwait(false); + } + } + + var status = GetFailedRedLockStatus(lockSummary); + + // give up + logger.LogDebug("Could not acquire quorum after {quorumRetryCount} attempts, giving up: {Resource} ({LockId}). {lockSummary}.", quorumRetryCount, Resource, LockId, lockSummary); + + return (status, lockSummary); + } + + private void StartAutoExtendTimer() + { + var interval = expiryTime.TotalMilliseconds / 2; + + logger.LogDebug("Starting auto extend timer with {interval}ms interval", interval); + + lockKeepaliveTimer = new Timer( + state => { ExtendLockLifetime(); }, + null, + (int)interval, + (int)interval); + } + + private void ExtendLockLifetime() + { + try + { + var gotSemaphore = extendUnlockSemaphore.Wait(0, unlockCancellationTokenSource.Token); + try + { + if (!gotSemaphore) + { + // another extend operation is still running, so skip this one + logger.LogWarning("Lock renewal skipped due to another renewal still running: {Resource} ({LockId})", Resource, LockId); + return; + } + + logger.LogTrace("Lock renewal timer fired: {Resource} ({LockId})", Resource, LockId); + + var stopwatch = Stopwatch.StartNew(); + + var extendSummary = Extend(); + + var validityTicks = GetRemainingValidityTicks(stopwatch); + + if (extendSummary.Acquired >= quorum && validityTicks > 0) + { + Status = RedLockStatus.Acquired; + InstanceSummary = extendSummary; + ExtendCount++; + + logger.LogDebug("Extended lock, {Status} ({InstanceSummary}): {Resource} ({LockId})", Status, InstanceSummary, Resource, LockId); + } + else + { + Status = GetFailedRedLockStatus(extendSummary); + InstanceSummary = extendSummary; + + logger.LogWarning("Failed to extend lock, {Status} ({InstanceSummary}): {Resource} ({LockId})", Status, InstanceSummary, Resource, LockId); + } + } + catch (Exception exception) + { + // All we can do here is log the exception and swallow it. + logger.LogError(exception, "Lock renewal timer thread failed: {Resource} ({LockId})", Resource, LockId); + } + finally + { + if (gotSemaphore) + { + extendUnlockSemaphore.Release(); + } + } + } + catch (OperationCanceledException) + { + // unlock has been called, don't extend + logger.LogDebug("Lock renewal cancelled: {Resource} ({LockId})", Resource, LockId); + } + } + + private long GetRemainingValidityTicks(Stopwatch sw) + { + // Add 2 milliseconds to the drift to account for Redis expires precision, + // which is 1 milliescond, plus 1 millisecond min drift for small TTLs. + var driftTicks = (long)(expiryTime.Ticks * ClockDriftFactor) + ClockPrecisionPaddingTicks; + var validityTicks = expiryTime.Ticks - sw.Elapsed.Ticks - driftTicks; + return validityTicks; + } + + private RedLockInstanceSummary Lock() + { + var lockResults = new ConcurrentBag(); + + Parallel.ForEach(redisCaches, cache => + { + lockResults.Add(LockInstance(cache)); + }); + + return PopulateRedLockResult(lockResults); + } + + private async Task LockAsync() + { + var lockTasks = redisCaches.Select(LockInstanceAsync); + + var lockResults = await TaskUtils.WhenAll(lockTasks).ConfigureAwait(false); + + return PopulateRedLockResult(lockResults); + } + + private RedLockInstanceSummary Extend() + { + var extendResults = new ConcurrentBag(); + + Parallel.ForEach(redisCaches, cache => + { + extendResults.Add(ExtendInstance(cache)); + }); + + return PopulateRedLockResult(extendResults); + } + + private void Unlock() + { + // ReSharper disable once MethodSupportsCancellation + extendUnlockSemaphore.Wait(); + try + { + Parallel.ForEach(redisCaches, UnlockInstance); + } + finally + { + extendUnlockSemaphore.Release(); + } + } + + private async Task UnlockAsync() + { + // ReSharper disable once MethodSupportsCancellation + await extendUnlockSemaphore.WaitAsync().ConfigureAwait(false); + try + { + var unlockTasks = redisCaches.Select(UnlockInstanceAsync); + + await TaskUtils.WhenAll(unlockTasks).ConfigureAwait(false); + } + finally + { + extendUnlockSemaphore.Release(); + } + } + + private RedLockInstanceResult LockInstance(RedisConnection cache) + { + var redisKey = GetRedisKey(cache.RedisKeyFormat, Resource); + var host = GetHost(cache.ConnectionMultiplexer); + + RedLockInstanceResult result; + + try + { + logger.LogTrace("LockInstance enter {host}: {redisKey}, {LockId}, {expiryTime}", host, redisKey, LockId, expiryTime); + var redisResult = cache.ConnectionMultiplexer + .GetDatabase(cache.RedisDatabase) + .StringSet(redisKey, LockId, expiryTime, When.NotExists, CommandFlags.DemandMaster); + + result = redisResult ? RedLockInstanceResult.Success : RedLockInstanceResult.Conflicted; + } + catch (Exception ex) + { + logger.LogDebug("Error locking lock instance {host}: {exMessage}", host, ex.Message); + + result = RedLockInstanceResult.Error; + } + + logger.LogTrace("LockInstance exit {host}: {redisKey}, {LockId}, {result}", host, redisKey, LockId, result); + + return result; + } + + private async Task LockInstanceAsync(RedisConnection cache) + { + var redisKey = GetRedisKey(cache.RedisKeyFormat, Resource); + var host = GetHost(cache.ConnectionMultiplexer); + + RedLockInstanceResult result; + + try + { + logger.LogTrace("LockInstanceAsync enter {host}: {redisKey}, {LockId}, {expiryTime}", host, redisKey, LockId, expiryTime); + var redisResult = await cache.ConnectionMultiplexer + .GetDatabase(cache.RedisDatabase) + .StringSetAsync(redisKey, LockId, expiryTime, When.NotExists, CommandFlags.DemandMaster) + .ConfigureAwait(false); + + result = redisResult ? RedLockInstanceResult.Success : RedLockInstanceResult.Conflicted; + } + catch (Exception ex) + { + logger.LogDebug("Error locking lock instance {host}: {exMessage}", host, ex.Message); + + result = RedLockInstanceResult.Error; + } + + logger.LogTrace("LockInstanceAsync exit {host}: {redisKey}, {LockId}, {result}", host, redisKey, LockId, result); + + return result; + } + + private RedLockInstanceResult ExtendInstance(RedisConnection cache) + { + var redisKey = GetRedisKey(cache.RedisKeyFormat, Resource); + var host = GetHost(cache.ConnectionMultiplexer); + + RedLockInstanceResult result; + + try + { + logger.LogTrace("ExtendInstance enter {host}: {redisKey}, {LockId}, {expiryTime}", host, redisKey, LockId, expiryTime); + + // Returns 1 on success, 0 on failure setting expiry or key not existing, -1 if the key value didn't match + var extendResult = (long)cache.ConnectionMultiplexer + .GetDatabase(cache.RedisDatabase) + .ScriptEvaluate(ExtendIfMatchingValueScript, new RedisKey[] { redisKey }, new RedisValue[] { LockId, (long)expiryTime.TotalMilliseconds }, CommandFlags.DemandMaster); + + result = extendResult == 1 ? RedLockInstanceResult.Success + : extendResult == -1 ? RedLockInstanceResult.Conflicted + : RedLockInstanceResult.Error; + } + catch (Exception ex) + { + logger.LogDebug("Error extending lock instance {host}: {exMessage}", host, ex.Message); + + result = RedLockInstanceResult.Error; + } + + logger.LogTrace("ExtendInstance exit {host}: {redisKey}, {LockId}, {result}", host, redisKey, LockId, result); + + return result; + } + + private void UnlockInstance(RedisConnection cache) + { + var redisKey = GetRedisKey(cache.RedisKeyFormat, Resource); + var host = GetHost(cache.ConnectionMultiplexer); + + var result = false; + + try + { + logger.LogTrace("UnlockInstance enter {host}: {redisKey}, {LockId}"); + result = (bool)cache.ConnectionMultiplexer + .GetDatabase(cache.RedisDatabase) + .ScriptEvaluate(UnlockScript, new RedisKey[] { redisKey }, new RedisValue[] { LockId }, CommandFlags.DemandMaster); + } + catch (Exception ex) + { + logger.LogDebug("Error unlocking lock instance {host}: {exMessage}", host, ex.Message); + } + + logger.LogTrace("UnlockInstance exit {host}: {redisKey}, {LockId}, {result}", host, redisKey, LockId, result); + } + + private async Task UnlockInstanceAsync(RedisConnection cache) + { + var redisKey = GetRedisKey(cache.RedisKeyFormat, Resource); + var host = GetHost(cache.ConnectionMultiplexer); + + var result = false; + + try + { + logger.LogTrace("UnlockInstanceAsync enter {host}: {redisKey}, {LockId}", + host, redisKey, LockId); + + result = (bool)await cache.ConnectionMultiplexer + .GetDatabase(cache.RedisDatabase) + .ScriptEvaluateAsync(UnlockScript, new RedisKey[] { redisKey }, new RedisValue[] { LockId }, CommandFlags.DemandMaster) + .ConfigureAwait(false); + } + catch (Exception ex) + { + logger.LogDebug("Error unlocking lock instance {host}: {exMessage}", host, ex.Message); + } + + logger.LogTrace("UnlockInstanceAsync exit {host}: {redisKey}, {LockId}, {result}", host, redisKey, LockId, result); + + return result; + } + + private static string GetRedisKey(string redisKeyFormat, string resource) + { + return string.Format(redisKeyFormat, resource); + } + + internal static string GetHost(IConnectionMultiplexer cache) + { + var result = new StringBuilder(); + + foreach (var endPoint in cache.GetEndPoints()) + { + var server = cache.GetServer(endPoint); + + result.Append(server.EndPoint.GetFriendlyName()); + result.Append(" ("); + result.Append(server.IsSlave ? "slave" : "master"); + result.Append(server.IsConnected ? "" : ", disconnected"); + result.Append("), "); + } + + if (result.Length >= 2) + { + result.Remove(result.Length - 2, 2); + } + + return result.ToString(); + } + + public void Dispose() + { + Dispose(true); + } + + protected virtual void Dispose(bool disposing) + { + logger.LogDebug("Disposing {Resource} ({LockId})", Resource, LockId); + + if (isDisposed) + { + return; + } + + if (disposing) + { + StopKeepAliveTimer(); + } + + unlockCancellationTokenSource.Cancel(); + Unlock(); + + Status = RedLockStatus.Unlocked; + InstanceSummary = new RedLockInstanceSummary(); + + isDisposed = true; + } + + public ValueTask DisposeAsync() + { + return DisposeAsync(true); + } + + protected virtual async ValueTask DisposeAsync(bool disposing) + { + logger.LogDebug("Disposing {Resource} ({LockId})", Resource, LockId); + + if (isDisposed) + { + return; + } + + if (disposing) + { + StopKeepAliveTimer(); + } + + unlockCancellationTokenSource.Cancel(); + await UnlockAsync().ConfigureAwait(false); + + Status = RedLockStatus.Unlocked; + InstanceSummary = new RedLockInstanceSummary(); + + isDisposed = true; + } + + private RedLockStatus GetFailedRedLockStatus(RedLockInstanceSummary lockResult) + { + if (lockResult.Acquired >= quorum) + { + // if we got here with a quorum then validity must have expired + return RedLockStatus.Expired; + } + + if (lockResult.Acquired + lockResult.Conflicted >= quorum) + { + // we had enough instances for a quorum, but some were locked with another LockId + return RedLockStatus.Conflicted; + } + + return RedLockStatus.NoQuorum; + } + + private static RedLockInstanceSummary PopulateRedLockResult(IEnumerable instanceResults) + { + var acquired = 0; + var conflicted = 0; + var error = 0; + + foreach (var instanceResult in instanceResults) + { + switch (instanceResult) + { + case RedLockInstanceResult.Success: + acquired++; + break; + case RedLockInstanceResult.Conflicted: + conflicted++; + break; + case RedLockInstanceResult.Error: + error++; + break; + } + } + + return new RedLockInstanceSummary(acquired, conflicted, error); + } + + internal void StopKeepAliveTimer() + { + lock (lockObject) + { + if (lockKeepaliveTimer != null) + { + lockKeepaliveTimer.Change(Timeout.Infinite, Timeout.Infinite); + lockKeepaliveTimer.Dispose(); + lockKeepaliveTimer = null; + } + } + } + } +}