Skip to content

Commit

Permalink
Ensure that broken connection does not pollute the multiplexed connec…
Browse files Browse the repository at this point in the history
…tion pool.

Fix #83
  • Loading branch information
madelson committed Apr 24, 2021
1 parent 99afbe4 commit 1d4a526
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 16 deletions.
16 changes: 8 additions & 8 deletions DistributedLock.Core/Internal/Data/DatabaseConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ private async ValueTask DisposeOrCloseAsync(bool isDispose)
finally
{
#if NETSTANDARD2_1
if (!SyncViaAsync.IsSynchronous && this.InnerConnection is DbConnection dbConnection)
{
await (isDispose ? dbConnection.DisposeAsync() : dbConnection.CloseAsync().AsValueTask()).ConfigureAwait(false);
}
else
{
SyncDisposeConnection();
}
if (!SyncViaAsync.IsSynchronous && this.InnerConnection is DbConnection dbConnection)
{
await (isDispose ? dbConnection.DisposeAsync() : dbConnection.CloseAsync().AsValueTask()).ConfigureAwait(false);
}
else
{
SyncDisposeConnection();
}
#elif NETSTANDARD2_0 || NET461
SyncDisposeConnection();
#else
Expand Down
39 changes: 32 additions & 7 deletions DistributedLock.Core/Internal/Data/MultiplexedConnectionLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ internal sealed class MultiplexedConnectionLock : IAsyncDisposable
private readonly AsyncLock _mutex = AsyncLock.Create();
private readonly Dictionary<string, TimeoutValue> _heldLocksToKeepaliveCadences = new Dictionary<string, TimeoutValue>();
private readonly DatabaseConnection _connection;
/// <summary>
/// Tracks whether we've successfully opened the connection. We track this explicity instead of just looking at
/// <see cref="DatabaseConnection.CanExecuteQueries"/> because we want to make sure we close() explicitly for every
/// open() and also we want to make sure we do not try to re-open a broken connection.
/// </summary>
private bool _connectionOpened;

public MultiplexedConnectionLock(DatabaseConnection connection)
{
this._connection = connection;
}

private bool IsConnectionBrokenNoLock => this._connectionOpened && !this._connection.CanExecuteQueries;

public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
string name,
TimeoutValue timeout,
Expand All @@ -33,8 +41,8 @@ public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
bool opportunistic)
where TLockCookie : class
{
using var mutextHandle = await this._mutex.TryAcquireAsync(opportunistic ? TimeSpan.Zero : Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
if (mutextHandle == null)
using var mutexHandle = await this._mutex.TryAcquireAsync(opportunistic ? TimeSpan.Zero : Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
if (mutexHandle == null)
{
// mutex wasn't free, so just give up
Invariant.Require(opportunistic);
Expand All @@ -43,6 +51,10 @@ public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
return new Result(MultiplexedConnectionLockRetry.Retry, canSafelyDispose: false);
}

// This is technically redundant with the similar catch block below, but avoids needing to have
// to attempt a query on a connection that we know is broken.
if (opportunistic && this.IsConnectionBrokenNoLock) { return this.GetAlreadyBrokenResultNoLock(); }

try
{
if (this._heldLocksToKeepaliveCadences.ContainsKey(name))
Expand All @@ -53,9 +65,10 @@ public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
return this.GetFailureResultNoLock(isAlreadyHeld: true, opportunistic, timeout);
}

if (!this._connection.CanExecuteQueries)
if (!this._connectionOpened)
{
await this._connection.OpenAsync(cancellationToken).ConfigureAwait(false);
this._connectionOpened = true;
}

var lockCookie = await strategy.TryAcquireAsync(this._connection, name, opportunistic ? TimeSpan.Zero : timeout, cancellationToken).ConfigureAwait(false);
Expand All @@ -71,6 +84,11 @@ public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
// shortened the timeout
return this.GetFailureResultNoLock(isAlreadyHeld: false, opportunistic, timeout);
}
// never punish for the connection being broken already (see https://github.com/madelson/DistributedLock/issues/83)
catch when (opportunistic && this.IsConnectionBrokenNoLock)
{
return this.GetAlreadyBrokenResultNoLock();
}
finally
{
await this.CloseConnectionIfNeededNoLockAsync().ConfigureAwait(false);
Expand All @@ -90,6 +108,11 @@ public async ValueTask<bool> GetIsInUseAsync()
return mutexHandle == null || this._heldLocksToKeepaliveCadences.Count != 0;
}

private Result GetAlreadyBrokenResultNoLock() =>
// Retry on any already-broken connection to avoid "leaking" the killing or death of connections. We want there to be no observable
// results (other than perf) of multiplexing vs. not.
new Result(MultiplexedConnectionLockRetry.Retry, canSafelyDispose: this._heldLocksToKeepaliveCadences.Count == 0);

private Result GetFailureResultNoLock(bool isAlreadyHeld, bool opportunistic, TimeoutValue timeout)
{
// only opportunistic acquisitions trigger retries
Expand Down Expand Up @@ -151,11 +174,13 @@ private async ValueTask ReleaseAsync<TLockCookie>(IDbSynchronizationStrategy<TLo
}
}

private ValueTask CloseConnectionIfNeededNoLockAsync()
private async ValueTask CloseConnectionIfNeededNoLockAsync()
{
return this._heldLocksToKeepaliveCadences.Count == 0 && this._connection.CanExecuteQueries
? this._connection.CloseAsync()
: default;
if (this._connectionOpened && this._heldLocksToKeepaliveCadences.Count == 0)
{
await this._connection.CloseAsync().ConfigureAwait(false);
this._connectionOpened = false;
}
}

private void SetKeepaliveCadenceNoLock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public MultiplexedConnectionLockPool(Func<string, DatabaseConnection> connection
try
{
result = await TryAcquireAsync(@lock, opportunistic: false).ConfigureAwait(false);
Invariant.Require(result!.Value.Retry == MultiplexedConnectionLockRetry.NoRetry, "Acquire on fresh lock should not recommend a retry");
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using NUnit.Framework;
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
Expand Down Expand Up @@ -122,5 +123,31 @@ async Task Test()

string MakeLockName(int i) => $"{nameof(TestHighConcurrencyWithSmallPool)}_{i}";
}

[Test]
public async Task TestBrokenConnectionDoesNotCorruptPool()
{
// This makes sure that for the Semaphore5 lock initial 4 tickets are taken with the default
// application name and therefore won't be killed
this._lockProvider.CreateLock("1");
this._lockProvider.CreateLock("2");
var applicationName = this._lockProvider.Strategy.SetUniqueApplicationName();

var lock1 = this._lockProvider.CreateLock("1");
await using var handle1 = await lock1.AcquireAsync();

// kill the session
await this._lockProvider.Strategy.Db.KillSessionsAsync(applicationName);

var lock2 = this._lockProvider.CreateLock("2");
Assert.DoesNotThrowAsync(async () => await (await lock2.AcquireAsync()).DisposeAsync());

await using var handle2 = await lock2.AcquireAsync();
Assert.DoesNotThrow(() => lock2.TryAcquire()?.Dispose());

Assert.Catch(() => handle1.Dispose());

Assert.DoesNotThrowAsync(async () => await (await lock1.AcquireAsync()).DisposeAsync());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public int CountActiveSessions(string applicationName)
using var command = connection.CreateCommand();
command.CommandText = "SELECT COUNT(*)::int FROM pg_stat_activity WHERE application_name = @applicationName";
command.Parameters.AddWithValue("applicationName", applicationName);
return (int)command.ExecuteScalar();
return (int)command.ExecuteScalar()!;
}

public IsolationLevel GetIsolationLevel(DbConnection connection)
Expand Down

0 comments on commit 1d4a526

Please sign in to comment.