diff --git a/DistributedLock.Core/Internal/Data/DatabaseConnection.cs b/DistributedLock.Core/Internal/Data/DatabaseConnection.cs index 2bfbe254..00a320c4 100644 --- a/DistributedLock.Core/Internal/Data/DatabaseConnection.cs +++ b/DistributedLock.Core/Internal/Data/DatabaseConnection.cs @@ -112,14 +112,14 @@ private async ValueTask DisposeOrCloseAsync(bool isDispose) finally { #if NETSTANDARD2_1 - if (!SyncViaAsync.IsSynchronous && this.InnerConnection is DbConnection dbConnection) - { - await (isDispose ? dbConnection.DisposeAsync() : dbConnection.CloseAsync().AsValueTask()).ConfigureAwait(false); - } - else - { - SyncDisposeConnection(); - } + if (!SyncViaAsync.IsSynchronous && this.InnerConnection is DbConnection dbConnection) + { + await (isDispose ? dbConnection.DisposeAsync() : dbConnection.CloseAsync().AsValueTask()).ConfigureAwait(false); + } + else + { + SyncDisposeConnection(); + } #elif NETSTANDARD2_0 || NET461 SyncDisposeConnection(); #else diff --git a/DistributedLock.Core/Internal/Data/MultiplexedConnectionLock.cs b/DistributedLock.Core/Internal/Data/MultiplexedConnectionLock.cs index 991c42b9..153fd7c4 100644 --- a/DistributedLock.Core/Internal/Data/MultiplexedConnectionLock.cs +++ b/DistributedLock.Core/Internal/Data/MultiplexedConnectionLock.cs @@ -18,12 +18,20 @@ internal sealed class MultiplexedConnectionLock : IAsyncDisposable private readonly AsyncLock _mutex = AsyncLock.Create(); private readonly Dictionary _heldLocksToKeepaliveCadences = new Dictionary(); private readonly DatabaseConnection _connection; + /// + /// Tracks whether we've successfully opened the connection. We track this explicity instead of just looking at + /// because we want to make sure we close() explicitly for every + /// open() and also we want to make sure we do not try to re-open a broken connection. + /// + private bool _connectionOpened; public MultiplexedConnectionLock(DatabaseConnection connection) { this._connection = connection; } + private bool IsConnectionBrokenNoLock => this._connectionOpened && !this._connection.CanExecuteQueries; + public async ValueTask TryAcquireAsync( string name, TimeoutValue timeout, @@ -33,8 +41,8 @@ public async ValueTask TryAcquireAsync( bool opportunistic) where TLockCookie : class { - using var mutextHandle = await this._mutex.TryAcquireAsync(opportunistic ? TimeSpan.Zero : Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false); - if (mutextHandle == null) + using var mutexHandle = await this._mutex.TryAcquireAsync(opportunistic ? TimeSpan.Zero : Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false); + if (mutexHandle == null) { // mutex wasn't free, so just give up Invariant.Require(opportunistic); @@ -43,6 +51,10 @@ public async ValueTask TryAcquireAsync( return new Result(MultiplexedConnectionLockRetry.Retry, canSafelyDispose: false); } + // This is technically redundant with the similar catch block below, but avoids needing to have + // to attempt a query on a connection that we know is broken. + if (opportunistic && this.IsConnectionBrokenNoLock) { return this.GetAlreadyBrokenResultNoLock(); } + try { if (this._heldLocksToKeepaliveCadences.ContainsKey(name)) @@ -53,9 +65,10 @@ public async ValueTask TryAcquireAsync( return this.GetFailureResultNoLock(isAlreadyHeld: true, opportunistic, timeout); } - if (!this._connection.CanExecuteQueries) + if (!this._connectionOpened) { await this._connection.OpenAsync(cancellationToken).ConfigureAwait(false); + this._connectionOpened = true; } var lockCookie = await strategy.TryAcquireAsync(this._connection, name, opportunistic ? TimeSpan.Zero : timeout, cancellationToken).ConfigureAwait(false); @@ -71,6 +84,11 @@ public async ValueTask TryAcquireAsync( // shortened the timeout return this.GetFailureResultNoLock(isAlreadyHeld: false, opportunistic, timeout); } + // never punish for the connection being broken already (see https://github.com/madelson/DistributedLock/issues/83) + catch when (opportunistic && this.IsConnectionBrokenNoLock) + { + return this.GetAlreadyBrokenResultNoLock(); + } finally { await this.CloseConnectionIfNeededNoLockAsync().ConfigureAwait(false); @@ -90,6 +108,11 @@ public async ValueTask GetIsInUseAsync() return mutexHandle == null || this._heldLocksToKeepaliveCadences.Count != 0; } + private Result GetAlreadyBrokenResultNoLock() => + // Retry on any already-broken connection to avoid "leaking" the killing or death of connections. We want there to be no observable + // results (other than perf) of multiplexing vs. not. + new Result(MultiplexedConnectionLockRetry.Retry, canSafelyDispose: this._heldLocksToKeepaliveCadences.Count == 0); + private Result GetFailureResultNoLock(bool isAlreadyHeld, bool opportunistic, TimeoutValue timeout) { // only opportunistic acquisitions trigger retries @@ -151,11 +174,13 @@ private async ValueTask ReleaseAsync(IDbSynchronizationStrategy connection try { result = await TryAcquireAsync(@lock, opportunistic: false).ConfigureAwait(false); + Invariant.Require(result!.Value.Retry == MultiplexedConnectionLockRetry.NoRetry, "Acquire on fresh lock should not recommend a retry"); } finally { diff --git a/DistributedLock.Tests/AbstractTestCases/Data/MultiplexingConnectionStrategyTestCases.cs b/DistributedLock.Tests/AbstractTestCases/Data/MultiplexingConnectionStrategyTestCases.cs index 924f3da5..1c7aeae0 100644 --- a/DistributedLock.Tests/AbstractTestCases/Data/MultiplexingConnectionStrategyTestCases.cs +++ b/DistributedLock.Tests/AbstractTestCases/Data/MultiplexingConnectionStrategyTestCases.cs @@ -2,6 +2,7 @@ using NUnit.Framework; using System; using System.Collections.Generic; +using System.Data.Common; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; @@ -122,5 +123,31 @@ async Task Test() string MakeLockName(int i) => $"{nameof(TestHighConcurrencyWithSmallPool)}_{i}"; } + + [Test] + public async Task TestBrokenConnectionDoesNotCorruptPool() + { + // This makes sure that for the Semaphore5 lock initial 4 tickets are taken with the default + // application name and therefore won't be killed + this._lockProvider.CreateLock("1"); + this._lockProvider.CreateLock("2"); + var applicationName = this._lockProvider.Strategy.SetUniqueApplicationName(); + + var lock1 = this._lockProvider.CreateLock("1"); + await using var handle1 = await lock1.AcquireAsync(); + + // kill the session + await this._lockProvider.Strategy.Db.KillSessionsAsync(applicationName); + + var lock2 = this._lockProvider.CreateLock("2"); + Assert.DoesNotThrowAsync(async () => await (await lock2.AcquireAsync()).DisposeAsync()); + + await using var handle2 = await lock2.AcquireAsync(); + Assert.DoesNotThrow(() => lock2.TryAcquire()?.Dispose()); + + Assert.Catch(() => handle1.Dispose()); + + Assert.DoesNotThrowAsync(async () => await (await lock1.AcquireAsync()).DisposeAsync()); + } } } diff --git a/DistributedLock.Tests/Infrastructure/Postgres/TestingPostgresDb.cs b/DistributedLock.Tests/Infrastructure/Postgres/TestingPostgresDb.cs index 74d90019..c4d221b3 100644 --- a/DistributedLock.Tests/Infrastructure/Postgres/TestingPostgresDb.cs +++ b/DistributedLock.Tests/Infrastructure/Postgres/TestingPostgresDb.cs @@ -43,7 +43,7 @@ public int CountActiveSessions(string applicationName) using var command = connection.CreateCommand(); command.CommandText = "SELECT COUNT(*)::int FROM pg_stat_activity WHERE application_name = @applicationName"; command.Parameters.AddWithValue("applicationName", applicationName); - return (int)command.ExecuteScalar(); + return (int)command.ExecuteScalar()!; } public IsolationLevel GetIsolationLevel(DbConnection connection)